[spark] branch master updated: [SPARK-44548][PYTHON] Add support for pandas-on-Spark DataFrame assertDataFrameEqual

2023-07-27 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 7c1ad5bb60c [SPARK-44548][PYTHON] Add support for pandas-on-Spark 
DataFrame assertDataFrameEqual
7c1ad5bb60c is described below

commit 7c1ad5bb60c88c1c659c131e5119aab1f8212af5
Author: Amanda Liu 
AuthorDate: Fri Jul 28 14:41:43 2023 +0900

[SPARK-44548][PYTHON] Add support for pandas-on-Spark DataFrame 
assertDataFrameEqual

### What changes were proposed in this pull request?
This PR adds support for pandas-on-Spark DataFrame for the testing util, 
`assertDataFrameEqual`

### Why are the changes needed?
The change allows users to call the same PySpark API for both Spark and 
pandas DataFrames.

### Does this PR introduce _any_ user-facing change?
Yes, the PR affects the user-facing util `assertDataFrameEqual`

### How was this patch tested?
Added tests to `python/pyspark/sql/tests/test_utils.py` and 
`python/pyspark/sql/tests/connect/test_utils.py` and existing pandas util tests.

Closes #42158 from asl3/pandas-or-pyspark-df.

Authored-by: Amanda Liu 
Signed-off-by: Hyukjin Kwon 
---
 dev/sparktestsupport/modules.py  |   1 +
 python/docs/source/reference/pyspark.testing.rst |   1 +
 python/pyspark/errors/error_classes.py   |  42 ++
 python/pyspark/pandas/tests/test_utils.py| 171 +++-
 python/pyspark/sql/tests/test_utils.py   |  60 ++-
 python/pyspark/testing/__init__.py   |   4 +-
 python/pyspark/testing/pandasutils.py| 506 +--
 python/pyspark/testing/utils.py  |  82 +++-
 8 files changed, 689 insertions(+), 178 deletions(-)

diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py
index 3cfd82c3d31..79c3f8f26b1 100644
--- a/dev/sparktestsupport/modules.py
+++ b/dev/sparktestsupport/modules.py
@@ -514,6 +514,7 @@ pyspark_testing = Module(
 python_test_goals=[
 # doctests
 "pyspark.testing.utils",
+"pyspark.testing.pandasutils",
 ],
 )
 
diff --git a/python/docs/source/reference/pyspark.testing.rst 
b/python/docs/source/reference/pyspark.testing.rst
index 7a6b6cc0d70..96b0c72a7bb 100644
--- a/python/docs/source/reference/pyspark.testing.rst
+++ b/python/docs/source/reference/pyspark.testing.rst
@@ -26,4 +26,5 @@ Testing
 :toctree: api/
 
 assertDataFrameEqual
+assertPandasOnSparkEqual
 assertSchemaEqual
diff --git a/python/pyspark/errors/error_classes.py 
b/python/pyspark/errors/error_classes.py
index f4b643f1d32..5ecba294d0c 100644
--- a/python/pyspark/errors/error_classes.py
+++ b/python/pyspark/errors/error_classes.py
@@ -164,6 +164,42 @@ ERROR_CLASSES_JSON = """
   "Remote client cannot create a SparkContext. Create SparkSession 
instead."
 ]
   },
+  "DIFFERENT_PANDAS_DATAFRAME" : {
+"message" : [
+  "DataFrames are not almost equal:",
+  "Left: ",
+  "",
+  "Right: ",
+  ""
+]
+  },
+  "DIFFERENT_PANDAS_INDEX" : {
+"message" : [
+  "Indices are not almost equal:",
+  "Left: ",
+  "",
+  "Right: ",
+  ""
+]
+  },
+  "DIFFERENT_PANDAS_MULTIINDEX" : {
+"message" : [
+  "MultiIndices are not almost equal:",
+  "Left: ",
+  "",
+  "Right: ",
+  ""
+]
+  },
+  "DIFFERENT_PANDAS_SERIES" : {
+"message" : [
+  "Series are not almost equal:",
+  "Left: ",
+  "",
+  "Right: ",
+  ""
+]
+  },
   "DIFFERENT_ROWS" : {
 "message" : [
   ""
@@ -233,6 +269,12 @@ ERROR_CLASSES_JSON = """
   "NumPy array input should be of  dimensions."
 ]
   },
+  "INVALID_PANDAS_ON_SPARK_COMPARISON" : {
+"message" : [
+  "Expected two pandas-on-Spark DataFrames",
+  "but got actual:  and expected: "
+]
+  },
   "INVALID_PANDAS_UDF" : {
 "message" : [
   "Invalid function: "
diff --git a/python/pyspark/pandas/tests/test_utils.py 
b/python/pyspark/pandas/tests/test_utils.py
index 35ebcf17a0f..de7b0449dae 100644
--- a/python/pyspark/pandas/tests/test_utils.py
+++ b/python/pyspark/pandas/tests/test_utils.py
@@ -16,6 +16,7 @@
 #
 
 import pandas as pd
+from typing import Union
 
 from pyspark.pandas.indexes.base import Index
 from pyspark.pandas.utils import (
@@ -25,8 +26,14 @@ from pyspark.pandas.utils import (
 validate_index_loc,
 validate_mode,
 )
-from pyspark.testing.pandasutils import PandasOnSparkTestCase
+from pyspark.testing.pandasutils import (
+PandasOnSparkTestCase,
+assertPandasOnSparkEqual,
+_assert_pandas_equal,
+_assert_pandas_almost_equal,
+)
 from pyspark.testing.sqlutils import SQLTestUtils
+from pyspark.errors import PySparkAssertionError
 
 some_global_variable = 0
 
@@ -105,6 +112,168 @@ class UtilsTestsMixin:
 with 

[spark] branch branch-3.5 updated: [SPARK-44361][SQL][FOLLOWUP] Use PartitionEvaluator API in MapInBatchExec

2023-07-27 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
 new 75679f72173 [SPARK-44361][SQL][FOLLOWUP] Use PartitionEvaluator API in 
MapInBatchExec
75679f72173 is described below

commit 75679f72173a507faf94353d37a6d223ff23c9b4
Author: Vinod KC 
AuthorDate: Fri Jul 28 12:49:13 2023 +0800

[SPARK-44361][SQL][FOLLOWUP] Use PartitionEvaluator API in MapInBatchExec

### What changes were proposed in this pull request?
This is a follow-up of https://github.com/apache/spark/pull/42024, to set 
the partition index correctly even if it's not used for now.

### Why are the changes needed?
future-proof

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

existing tests

Closes #42189 from vinodkc/br_SPARK-44361_Followup.

Authored-by: Vinod KC 
Signed-off-by: Wenchen Fan 
(cherry picked from commit 3cf88cb6c42e802fc4828c397df61623663be9b0)
Signed-off-by: Wenchen Fan 
---
 .../org/apache/spark/sql/execution/python/MapInBatchExec.scala| 8 
 1 file changed, 4 insertions(+), 4 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/MapInBatchExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/MapInBatchExec.scala
index 4a47c2089d6..8db389f0266 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/MapInBatchExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/MapInBatchExec.scala
@@ -65,8 +65,8 @@ trait MapInBatchExec extends UnaryExecNode with 
PythonSQLMetrics {
   if (conf.usePartitionEvaluator) {
 rddBarrier.mapPartitionsWithEvaluator(evaluatorFactory)
   } else {
-rddBarrier.mapPartitions { iter =>
-  evaluatorFactory.createEvaluator().eval(0, iter)
+rddBarrier.mapPartitionsWithIndex { (index, iter) =>
+  evaluatorFactory.createEvaluator().eval(index, iter)
 }
   }
 } else {
@@ -74,8 +74,8 @@ trait MapInBatchExec extends UnaryExecNode with 
PythonSQLMetrics {
   if (conf.usePartitionEvaluator) {
 inputRdd.mapPartitionsWithEvaluator(evaluatorFactory)
   } else {
-inputRdd.mapPartitionsInternal { iter =>
-  evaluatorFactory.createEvaluator().eval(0, iter)
+inputRdd.mapPartitionsWithIndexInternal { (index, iter) =>
+  evaluatorFactory.createEvaluator().eval(index, iter)
 }
   }
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-44361][SQL][FOLLOWUP] Use PartitionEvaluator API in MapInBatchExec

2023-07-27 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 3cf88cb6c42 [SPARK-44361][SQL][FOLLOWUP] Use PartitionEvaluator API in 
MapInBatchExec
3cf88cb6c42 is described below

commit 3cf88cb6c42e802fc4828c397df61623663be9b0
Author: Vinod KC 
AuthorDate: Fri Jul 28 12:49:13 2023 +0800

[SPARK-44361][SQL][FOLLOWUP] Use PartitionEvaluator API in MapInBatchExec

### What changes were proposed in this pull request?
This is a follow-up of https://github.com/apache/spark/pull/42024, to set 
the partition index correctly even if it's not used for now.

### Why are the changes needed?
future-proof

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

existing tests

Closes #42189 from vinodkc/br_SPARK-44361_Followup.

Authored-by: Vinod KC 
Signed-off-by: Wenchen Fan 
---
 .../org/apache/spark/sql/execution/python/MapInBatchExec.scala| 8 
 1 file changed, 4 insertions(+), 4 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/MapInBatchExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/MapInBatchExec.scala
index 4a47c2089d6..8db389f0266 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/MapInBatchExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/MapInBatchExec.scala
@@ -65,8 +65,8 @@ trait MapInBatchExec extends UnaryExecNode with 
PythonSQLMetrics {
   if (conf.usePartitionEvaluator) {
 rddBarrier.mapPartitionsWithEvaluator(evaluatorFactory)
   } else {
-rddBarrier.mapPartitions { iter =>
-  evaluatorFactory.createEvaluator().eval(0, iter)
+rddBarrier.mapPartitionsWithIndex { (index, iter) =>
+  evaluatorFactory.createEvaluator().eval(index, iter)
 }
   }
 } else {
@@ -74,8 +74,8 @@ trait MapInBatchExec extends UnaryExecNode with 
PythonSQLMetrics {
   if (conf.usePartitionEvaluator) {
 inputRdd.mapPartitionsWithEvaluator(evaluatorFactory)
   } else {
-inputRdd.mapPartitionsInternal { iter =>
-  evaluatorFactory.createEvaluator().eval(0, iter)
+inputRdd.mapPartitionsWithIndexInternal { (index, iter) =>
+  evaluatorFactory.createEvaluator().eval(index, iter)
 }
   }
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.5 updated: [SPARK-42098][SQL] Fix ResolveInlineTables can not handle with RuntimeReplaceable expression

2023-07-27 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
 new 81d56603b30 [SPARK-42098][SQL] Fix ResolveInlineTables can not handle 
with RuntimeReplaceable expression
81d56603b30 is described below

commit 81d56603b30375790ca0203ed877d5bcf0924c77
Author: Jia Fan 
AuthorDate: Fri Jul 28 12:44:02 2023 +0800

[SPARK-42098][SQL] Fix ResolveInlineTables can not handle with 
RuntimeReplaceable expression

### What changes were proposed in this pull request?
Fix `ResolveInlineTables` can not handle with `RuntimeReplaceable` 
expression
eg:
```sql
select * from values (try_add(5, 0))
```
The error will throw:
```java
[INVALID_INLINE_TABLE.CANNOT_EVALUATE_EXPRESSION_IN_INLINE_TABLE] Invalid 
inline table. Cannot evaluate the expression "try_add(5, 0)" in inline table 
definition.; line 1 pos 22
```

### Why are the changes needed?
Fix the bug ResolveInlineTables can not handle with RuntimeReplaceable 
expression

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
add new test.

Closes #42110 from Hisoka-X/SPARK-42098_inline_table_replaceable.

Authored-by: Jia Fan 
Signed-off-by: Wenchen Fan 
(cherry picked from commit f235c9f622ddeb4c6a5cce8903130709dcd2217c)
Signed-off-by: Wenchen Fan 
---
 .../catalyst/analysis/ResolveInlineTables.scala|  6 --
 .../analyzer-results/inline-table.sql.out  | 21 +++
 .../resources/sql-tests/inputs/inline-table.sql|  5 +
 .../sql-tests/results/inline-table.sql.out | 24 ++
 4 files changed, 54 insertions(+), 2 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTables.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTables.scala
index cf706171cd9..c203f9fa39d 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTables.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTables.scala
@@ -21,6 +21,7 @@ import scala.util.control.NonFatal
 
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.AliasHelper
+import org.apache.spark.sql.catalyst.optimizer.ReplaceExpressions
 import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.catalyst.trees.AlwaysProcess
@@ -36,8 +37,9 @@ object ResolveInlineTables extends Rule[LogicalPlan] with 
CastSupport with Alias
 AlwaysProcess.fn, ruleId) {
 case table: UnresolvedInlineTable if table.expressionsResolved =>
   validateInputDimension(table)
-  validateInputEvaluable(table)
-  convert(table)
+  val newTable = 
ReplaceExpressions(table).asInstanceOf[UnresolvedInlineTable]
+  validateInputEvaluable(newTable)
+  convert(newTable)
   }
 
   /**
diff --git 
a/sql/core/src/test/resources/sql-tests/analyzer-results/inline-table.sql.out 
b/sql/core/src/test/resources/sql-tests/analyzer-results/inline-table.sql.out
index f3e6eb4d8dc..2a17f092a06 100644
--- 
a/sql/core/src/test/resources/sql-tests/analyzer-results/inline-table.sql.out
+++ 
b/sql/core/src/test/resources/sql-tests/analyzer-results/inline-table.sql.out
@@ -220,3 +220,24 @@ select * from values (timestamp('1991-12-06 00:00:00.0'), 
array(timestamp('1991-
 Project [a#x, b#x]
 +- SubqueryAlias data
+- LocalRelation [a#x, b#x]
+
+
+-- !query
+select * from values (try_add(5, 0))
+-- !query analysis
+Project [col1#x]
++- LocalRelation [col1#x]
+
+
+-- !query
+select * from values (try_divide(5, 0))
+-- !query analysis
+Project [col1#x]
++- LocalRelation [col1#x]
+
+
+-- !query
+select * from values (10 + try_divide(5, 0))
+-- !query analysis
+Project [col1#x]
++- LocalRelation [col1#x]
diff --git a/sql/core/src/test/resources/sql-tests/inputs/inline-table.sql 
b/sql/core/src/test/resources/sql-tests/inputs/inline-table.sql
index fd8bb2d837d..6867248f576 100644
--- a/sql/core/src/test/resources/sql-tests/inputs/inline-table.sql
+++ b/sql/core/src/test/resources/sql-tests/inputs/inline-table.sql
@@ -55,3 +55,8 @@ select * from values ("one", count(1)), ("two", 2) as data(a, 
b);
 
 -- string to timestamp
 select * from values (timestamp('1991-12-06 00:00:00.0'), 
array(timestamp('1991-12-06 01:00:00.0'), timestamp('1991-12-06 12:00:00.0'))) 
as data(a, b);
+
+-- ReplaceExpressions as row
+select * from values (try_add(5, 0));
+select * from values (try_divide(5, 0));
+select * from values (10 + try_divide(5, 0));
diff --git a/sql/core/src/test/resources/sql-tests/results/inline-table.sql.out 

[spark] branch master updated: [SPARK-42098][SQL] Fix ResolveInlineTables can not handle with RuntimeReplaceable expression

2023-07-27 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new f235c9f622d [SPARK-42098][SQL] Fix ResolveInlineTables can not handle 
with RuntimeReplaceable expression
f235c9f622d is described below

commit f235c9f622ddeb4c6a5cce8903130709dcd2217c
Author: Jia Fan 
AuthorDate: Fri Jul 28 12:44:02 2023 +0800

[SPARK-42098][SQL] Fix ResolveInlineTables can not handle with 
RuntimeReplaceable expression

### What changes were proposed in this pull request?
Fix `ResolveInlineTables` can not handle with `RuntimeReplaceable` 
expression
eg:
```sql
select * from values (try_add(5, 0))
```
The error will throw:
```java
[INVALID_INLINE_TABLE.CANNOT_EVALUATE_EXPRESSION_IN_INLINE_TABLE] Invalid 
inline table. Cannot evaluate the expression "try_add(5, 0)" in inline table 
definition.; line 1 pos 22
```

### Why are the changes needed?
Fix the bug ResolveInlineTables can not handle with RuntimeReplaceable 
expression

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
add new test.

Closes #42110 from Hisoka-X/SPARK-42098_inline_table_replaceable.

Authored-by: Jia Fan 
Signed-off-by: Wenchen Fan 
---
 .../catalyst/analysis/ResolveInlineTables.scala|  6 --
 .../analyzer-results/inline-table.sql.out  | 21 +++
 .../resources/sql-tests/inputs/inline-table.sql|  5 +
 .../sql-tests/results/inline-table.sql.out | 24 ++
 4 files changed, 54 insertions(+), 2 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTables.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTables.scala
index cf706171cd9..c203f9fa39d 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTables.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTables.scala
@@ -21,6 +21,7 @@ import scala.util.control.NonFatal
 
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.AliasHelper
+import org.apache.spark.sql.catalyst.optimizer.ReplaceExpressions
 import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.catalyst.trees.AlwaysProcess
@@ -36,8 +37,9 @@ object ResolveInlineTables extends Rule[LogicalPlan] with 
CastSupport with Alias
 AlwaysProcess.fn, ruleId) {
 case table: UnresolvedInlineTable if table.expressionsResolved =>
   validateInputDimension(table)
-  validateInputEvaluable(table)
-  convert(table)
+  val newTable = 
ReplaceExpressions(table).asInstanceOf[UnresolvedInlineTable]
+  validateInputEvaluable(newTable)
+  convert(newTable)
   }
 
   /**
diff --git 
a/sql/core/src/test/resources/sql-tests/analyzer-results/inline-table.sql.out 
b/sql/core/src/test/resources/sql-tests/analyzer-results/inline-table.sql.out
index f3e6eb4d8dc..2a17f092a06 100644
--- 
a/sql/core/src/test/resources/sql-tests/analyzer-results/inline-table.sql.out
+++ 
b/sql/core/src/test/resources/sql-tests/analyzer-results/inline-table.sql.out
@@ -220,3 +220,24 @@ select * from values (timestamp('1991-12-06 00:00:00.0'), 
array(timestamp('1991-
 Project [a#x, b#x]
 +- SubqueryAlias data
+- LocalRelation [a#x, b#x]
+
+
+-- !query
+select * from values (try_add(5, 0))
+-- !query analysis
+Project [col1#x]
++- LocalRelation [col1#x]
+
+
+-- !query
+select * from values (try_divide(5, 0))
+-- !query analysis
+Project [col1#x]
++- LocalRelation [col1#x]
+
+
+-- !query
+select * from values (10 + try_divide(5, 0))
+-- !query analysis
+Project [col1#x]
++- LocalRelation [col1#x]
diff --git a/sql/core/src/test/resources/sql-tests/inputs/inline-table.sql 
b/sql/core/src/test/resources/sql-tests/inputs/inline-table.sql
index fd8bb2d837d..6867248f576 100644
--- a/sql/core/src/test/resources/sql-tests/inputs/inline-table.sql
+++ b/sql/core/src/test/resources/sql-tests/inputs/inline-table.sql
@@ -55,3 +55,8 @@ select * from values ("one", count(1)), ("two", 2) as data(a, 
b);
 
 -- string to timestamp
 select * from values (timestamp('1991-12-06 00:00:00.0'), 
array(timestamp('1991-12-06 01:00:00.0'), timestamp('1991-12-06 12:00:00.0'))) 
as data(a, b);
+
+-- ReplaceExpressions as row
+select * from values (try_add(5, 0));
+select * from values (try_divide(5, 0));
+select * from values (10 + try_divide(5, 0));
diff --git a/sql/core/src/test/resources/sql-tests/results/inline-table.sql.out 
b/sql/core/src/test/resources/sql-tests/results/inline-table.sql.out
index e735dd0dd5a..709d7ab73f6 100644
--- 

[spark] branch branch-3.5 updated: [SPARK-44287][SQL][FOLLOWUP] Set partition index correctly

2023-07-27 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
 new 4f7187885a6 [SPARK-44287][SQL][FOLLOWUP] Set partition index correctly
4f7187885a6 is described below

commit 4f7187885a6c6a0c944af97a493a42dabca3cc1b
Author: Wenchen Fan 
AuthorDate: Fri Jul 28 11:36:59 2023 +0800

[SPARK-44287][SQL][FOLLOWUP] Set partition index correctly

### What changes were proposed in this pull request?

This is a followup of https://github.com/apache/spark/pull/41839, to set 
the partition index correctly even if it's not used for now. It also contains a 
few code cleanup.

### Why are the changes needed?

future-proof

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

existing tests

Closes #42185 from cloud-fan/follow.

Authored-by: Wenchen Fan 
Signed-off-by: Wenchen Fan 
(cherry picked from commit bf1bbc514ebf25fd8041f566dc9d13593c307931)
Signed-off-by: Wenchen Fan 
---
 .../org/apache/spark/sql/execution/Columnar.scala  | 41 ++
 .../spark/sql/SparkSessionExtensionSuite.scala | 64 ++
 .../spark/sql/execution/SparkPlanSuite.scala   | 26 -
 3 files changed, 57 insertions(+), 74 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala
index a2029816c23..fc879f7e98f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala
@@ -85,20 +85,16 @@ case class ColumnarToRowExec(child: SparkPlan) extends 
ColumnarToRowTransition w
   )
 
   override def doExecute(): RDD[InternalRow] = {
-val numOutputRows = longMetric("numOutputRows")
-val numInputBatches = longMetric("numInputBatches")
-val evaluatorFactory =
-  new ColumnarToRowEvaluatorFactory(
-child.output,
-numOutputRows,
-numInputBatches)
-
+val evaluatorFactory = new ColumnarToRowEvaluatorFactory(
+  child.output,
+  longMetric("numOutputRows"),
+  longMetric("numInputBatches"))
 if (conf.usePartitionEvaluator) {
   child.executeColumnar().mapPartitionsWithEvaluator(evaluatorFactory)
 } else {
-  child.executeColumnar().mapPartitionsInternal { batches =>
+  child.executeColumnar().mapPartitionsWithIndexInternal { (index, 
batches) =>
 val evaluator = evaluatorFactory.createEvaluator()
-evaluator.eval(0, batches)
+evaluator.eval(index, batches)
   }
 }
   }
@@ -454,25 +450,20 @@ case class RowToColumnarExec(child: SparkPlan) extends 
RowToColumnarTransition {
   )
 
   override def doExecuteColumnar(): RDD[ColumnarBatch] = {
-val numInputRows = longMetric("numInputRows")
-val numOutputBatches = longMetric("numOutputBatches")
-// Instead of creating a new config we are reusing columnBatchSize. In the 
future if we do
-// combine with some of the Arrow conversion tools we will need to unify 
some of the configs.
-val numRows = conf.columnBatchSize
-val evaluatorFactory =
-  new RowToColumnarEvaluatorFactory(
-conf.offHeapColumnVectorEnabled,
-numRows,
-schema,
-numInputRows,
-numOutputBatches)
-
+val evaluatorFactory = new RowToColumnarEvaluatorFactory(
+  conf.offHeapColumnVectorEnabled,
+  // Instead of creating a new config we are reusing columnBatchSize. In 
the future if we do
+  // combine with some of the Arrow conversion tools we will need to unify 
some of the configs.
+  conf.columnBatchSize,
+  schema,
+  longMetric("numInputRows"),
+  longMetric("numOutputBatches"))
 if (conf.usePartitionEvaluator) {
   child.execute().mapPartitionsWithEvaluator(evaluatorFactory)
 } else {
-  child.execute().mapPartitionsInternal { rowIterator =>
+  child.execute().mapPartitionsWithIndexInternal { (index, rowIterator) =>
 val evaluator = evaluatorFactory.createEvaluator()
-evaluator.eval(0, rowIterator)
+evaluator.eval(index, rowIterator)
   }
 }
   }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala
index 043a3b1a7e5..d4a871c00a1 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala
@@ -279,40 +279,36 @@ class SparkSessionExtensionSuite extends SparkFunSuite 
with SQLHelper {
 }
 withSession(extensions) { session =>
   session.conf.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED, enableAQE)
-  

[spark] branch master updated: [SPARK-44287][SQL][FOLLOWUP] Set partition index correctly

2023-07-27 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new bf1bbc514eb [SPARK-44287][SQL][FOLLOWUP] Set partition index correctly
bf1bbc514eb is described below

commit bf1bbc514ebf25fd8041f566dc9d13593c307931
Author: Wenchen Fan 
AuthorDate: Fri Jul 28 11:36:59 2023 +0800

[SPARK-44287][SQL][FOLLOWUP] Set partition index correctly

### What changes were proposed in this pull request?

This is a followup of https://github.com/apache/spark/pull/41839, to set 
the partition index correctly even if it's not used for now. It also contains a 
few code cleanup.

### Why are the changes needed?

future-proof

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

existing tests

Closes #42185 from cloud-fan/follow.

Authored-by: Wenchen Fan 
Signed-off-by: Wenchen Fan 
---
 .../org/apache/spark/sql/execution/Columnar.scala  | 41 ++
 .../spark/sql/SparkSessionExtensionSuite.scala | 64 ++
 .../spark/sql/execution/SparkPlanSuite.scala   | 26 -
 3 files changed, 57 insertions(+), 74 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala
index a2029816c23..fc879f7e98f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala
@@ -85,20 +85,16 @@ case class ColumnarToRowExec(child: SparkPlan) extends 
ColumnarToRowTransition w
   )
 
   override def doExecute(): RDD[InternalRow] = {
-val numOutputRows = longMetric("numOutputRows")
-val numInputBatches = longMetric("numInputBatches")
-val evaluatorFactory =
-  new ColumnarToRowEvaluatorFactory(
-child.output,
-numOutputRows,
-numInputBatches)
-
+val evaluatorFactory = new ColumnarToRowEvaluatorFactory(
+  child.output,
+  longMetric("numOutputRows"),
+  longMetric("numInputBatches"))
 if (conf.usePartitionEvaluator) {
   child.executeColumnar().mapPartitionsWithEvaluator(evaluatorFactory)
 } else {
-  child.executeColumnar().mapPartitionsInternal { batches =>
+  child.executeColumnar().mapPartitionsWithIndexInternal { (index, 
batches) =>
 val evaluator = evaluatorFactory.createEvaluator()
-evaluator.eval(0, batches)
+evaluator.eval(index, batches)
   }
 }
   }
@@ -454,25 +450,20 @@ case class RowToColumnarExec(child: SparkPlan) extends 
RowToColumnarTransition {
   )
 
   override def doExecuteColumnar(): RDD[ColumnarBatch] = {
-val numInputRows = longMetric("numInputRows")
-val numOutputBatches = longMetric("numOutputBatches")
-// Instead of creating a new config we are reusing columnBatchSize. In the 
future if we do
-// combine with some of the Arrow conversion tools we will need to unify 
some of the configs.
-val numRows = conf.columnBatchSize
-val evaluatorFactory =
-  new RowToColumnarEvaluatorFactory(
-conf.offHeapColumnVectorEnabled,
-numRows,
-schema,
-numInputRows,
-numOutputBatches)
-
+val evaluatorFactory = new RowToColumnarEvaluatorFactory(
+  conf.offHeapColumnVectorEnabled,
+  // Instead of creating a new config we are reusing columnBatchSize. In 
the future if we do
+  // combine with some of the Arrow conversion tools we will need to unify 
some of the configs.
+  conf.columnBatchSize,
+  schema,
+  longMetric("numInputRows"),
+  longMetric("numOutputBatches"))
 if (conf.usePartitionEvaluator) {
   child.execute().mapPartitionsWithEvaluator(evaluatorFactory)
 } else {
-  child.execute().mapPartitionsInternal { rowIterator =>
+  child.execute().mapPartitionsWithIndexInternal { (index, rowIterator) =>
 val evaluator = evaluatorFactory.createEvaluator()
-evaluator.eval(0, rowIterator)
+evaluator.eval(index, rowIterator)
   }
 }
   }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala
index 043a3b1a7e5..d4a871c00a1 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala
@@ -279,40 +279,36 @@ class SparkSessionExtensionSuite extends SparkFunSuite 
with SQLHelper {
 }
 withSession(extensions) { session =>
   session.conf.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED, enableAQE)
-  Seq(true, false).foreach { enableEvaluator =>
-withSQLConf(SQLConf.USE_PARTITION_EVALUATOR.key -> 

[spark] branch branch-3.5 updated: [SPARK-44558][CONNECT][PYTHON] Export Spark Log Level

2023-07-27 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
 new 4f90c3232ca [SPARK-44558][CONNECT][PYTHON] Export Spark Log Level
4f90c3232ca is described below

commit 4f90c3232cabd8bef05f46fc863d7a8f1f968ee4
Author: Alice Sayutina 
AuthorDate: Fri Jul 28 11:34:16 2023 +0900

[SPARK-44558][CONNECT][PYTHON] Export Spark Log Level

### What changes were proposed in this pull request?

Export Spark Connect Log Level in pyspark.

### Why are the changes needed?

This is convenient for software dependent on spark-connect so it can be 
possible to enable debug logging just in one place.

### Does this PR introduce _any_ user-facing change?

New api is suggested.

### How was this patch tested?

Checked it works from shell

Closes #42175 from cdkrot/spark_connect_debug_logs.

Lead-authored-by: Alice Sayutina 
Co-authored-by: Alice Sayutina 
Signed-off-by: Hyukjin Kwon 
(cherry picked from commit 9c7f613f0f10f4ee90bb29920157a32017696a43)
Signed-off-by: Hyukjin Kwon 
---
 python/pyspark/sql/connect/client/core.py | 17 -
 1 file changed, 16 insertions(+), 1 deletion(-)

diff --git a/python/pyspark/sql/connect/client/core.py 
b/python/pyspark/sql/connect/client/core.py
index 482482123c0..53d3d10d647 100644
--- a/python/pyspark/sql/connect/client/core.py
+++ b/python/pyspark/sql/connect/client/core.py
@@ -17,6 +17,7 @@
 __all__ = [
 "ChannelBuilder",
 "SparkConnectClient",
+"getLogLevel",
 ]
 
 from pyspark.sql.connect.utils import check_dependencies
@@ -104,7 +105,7 @@ def _configure_logging() -> logging.Logger:
 
 # Check the environment variables for log levels:
 if "SPARK_CONNECT_LOG_LEVEL" in os.environ:
-logger.setLevel(os.getenv("SPARK_CONNECT_LOG_LEVEL", "error").upper())
+logger.setLevel(os.environ["SPARK_CONNECT_LOG_LEVEL"].upper())
 else:
 logger.disabled = True
 return logger
@@ -114,6 +115,20 @@ def _configure_logging() -> logging.Logger:
 logger = _configure_logging()
 
 
+def getLogLevel() -> Optional[int]:
+"""
+This returns this log level as integer, or none (if no logging is enabled).
+
+Spark Connect logging can be configured with environment variable 
'SPARK_CONNECT_LOG_LEVEL'
+
+.. versionadded:: 3.5.0
+"""
+
+if not logger.disabled:
+return logger.level
+return None
+
+
 class ChannelBuilder:
 """
 This is a helper class that is used to create a GRPC channel based on the 
given


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-44558][CONNECT][PYTHON] Export Spark Log Level

2023-07-27 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 9c7f613f0f1 [SPARK-44558][CONNECT][PYTHON] Export Spark Log Level
9c7f613f0f1 is described below

commit 9c7f613f0f10f4ee90bb29920157a32017696a43
Author: Alice Sayutina 
AuthorDate: Fri Jul 28 11:34:16 2023 +0900

[SPARK-44558][CONNECT][PYTHON] Export Spark Log Level

### What changes were proposed in this pull request?

Export Spark Connect Log Level in pyspark.

### Why are the changes needed?

This is convenient for software dependent on spark-connect so it can be 
possible to enable debug logging just in one place.

### Does this PR introduce _any_ user-facing change?

New api is suggested.

### How was this patch tested?

Checked it works from shell

Closes #42175 from cdkrot/spark_connect_debug_logs.

Lead-authored-by: Alice Sayutina 
Co-authored-by: Alice Sayutina 
Signed-off-by: Hyukjin Kwon 
---
 python/pyspark/sql/connect/client/core.py | 17 -
 1 file changed, 16 insertions(+), 1 deletion(-)

diff --git a/python/pyspark/sql/connect/client/core.py 
b/python/pyspark/sql/connect/client/core.py
index 473a00f4001..0288bbc6508 100644
--- a/python/pyspark/sql/connect/client/core.py
+++ b/python/pyspark/sql/connect/client/core.py
@@ -17,6 +17,7 @@
 __all__ = [
 "ChannelBuilder",
 "SparkConnectClient",
+"getLogLevel",
 ]
 
 from pyspark.sql.connect.utils import check_dependencies
@@ -104,7 +105,7 @@ def _configure_logging() -> logging.Logger:
 
 # Check the environment variables for log levels:
 if "SPARK_CONNECT_LOG_LEVEL" in os.environ:
-logger.setLevel(os.getenv("SPARK_CONNECT_LOG_LEVEL", "error").upper())
+logger.setLevel(os.environ["SPARK_CONNECT_LOG_LEVEL"].upper())
 else:
 logger.disabled = True
 return logger
@@ -114,6 +115,20 @@ def _configure_logging() -> logging.Logger:
 logger = _configure_logging()
 
 
+def getLogLevel() -> Optional[int]:
+"""
+This returns this log level as integer, or none (if no logging is enabled).
+
+Spark Connect logging can be configured with environment variable 
'SPARK_CONNECT_LOG_LEVEL'
+
+.. versionadded:: 3.5.0
+"""
+
+if not logger.disabled:
+return logger.level
+return None
+
+
 class ChannelBuilder:
 """
 This is a helper class that is used to create a GRPC channel based on the 
given


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.5 updated: [SPARK-44264][PYTHON] E2E Testing for Deepspeed

2023-07-27 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
 new 3c6971149dc [SPARK-44264][PYTHON] E2E Testing for Deepspeed
3c6971149dc is described below

commit 3c6971149dcad55de6aa5eeeff29cb54a07acb8f
Author: Mathew Jacob 
AuthorDate: Fri Jul 28 09:14:45 2023 +0900

[SPARK-44264][PYTHON] E2E Testing for Deepspeed

### What Changes Were Proposed
This PR adds some end to end tests for the DeepspeedTorchDistributor. Due 
to the lack of support currently available to deepspeed, we use proxy functions 
that are very simple to test that the command works. For the future, once 
Deepspeed supports more CPU, these end to end tests should instead migrate to 
more actual Deepspeed workloads, such as those described in the PR comments.

### Why Do We Need These Changes
Previously, we only had unit tests for helper functions. These test actual 
workloads that a user may use DeepspeedTorchDistributor from end to end.

### Any User Facing Changes
No, these are end to end tests.

### How Was This Tested
Running the tests and seeing if they pass.

Closes #42118 from mathewjacob1002/gpu_e2e_tests.

Authored-by: Mathew Jacob 
Signed-off-by: Hyukjin Kwon 
(cherry picked from commit 9ae33b82d78d65f58acb253c0b710b5807f9912d)
Signed-off-by: Hyukjin Kwon 
---
 dev/requirements.txt   |   3 +
 dev/tox.ini|   1 +
 python/mypy.ini|   3 +
 .../deepspeed/tests/test_deepspeed_distributor.py  | 134 -
 4 files changed, 139 insertions(+), 2 deletions(-)

diff --git a/dev/requirements.txt b/dev/requirements.txt
index f5fe5fa071f..38a9b244710 100644
--- a/dev/requirements.txt
+++ b/dev/requirements.txt
@@ -65,3 +65,6 @@ torch
 torchvision
 torcheval
 
+# DeepspeedTorchDistributor dependencies
+deepspeed
+
diff --git a/dev/tox.ini b/dev/tox.ini
index e2a77786ed4..438f82fec1e 100644
--- a/dev/tox.ini
+++ b/dev/tox.ini
@@ -46,6 +46,7 @@ exclude =
 */target/*,
 docs/.local_ruby_bundle/,
 *python/pyspark/cloudpickle/*.py,
+*python/pyspark/ml/deepspeed/tests/*.py
 *python/docs/build/*,
 *python/docs/source/conf.py,
 *python/.eggs/*,
diff --git a/python/mypy.ini b/python/mypy.ini
index a845cd88bd8..4d1fc3ceb66 100644
--- a/python/mypy.ini
+++ b/python/mypy.ini
@@ -88,6 +88,9 @@ ignore_errors = True
 [mypy-pyspark.ml.torch.tests.*]
 ignore_errors = True
 
+[mypy-pyspark.ml.deepspeed.tests.*]
+ignore_errors = True
+
 [mypy-pyspark.mllib.tests.*]
 ignore_errors = True
 
diff --git a/python/pyspark/ml/deepspeed/tests/test_deepspeed_distributor.py 
b/python/pyspark/ml/deepspeed/tests/test_deepspeed_distributor.py
index 4c4606699a3..590e541c384 100644
--- a/python/pyspark/ml/deepspeed/tests/test_deepspeed_distributor.py
+++ b/python/pyspark/ml/deepspeed/tests/test_deepspeed_distributor.py
@@ -1,4 +1,4 @@
-#
+# mypy: ignore-errors
 # Licensed to the Apache Software Foundation (ASF) under one or more
 # contributor license agreements.  See the NOTICE file distributed with
 # this work for additional information regarding copyright ownership.
@@ -14,12 +14,28 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
+from contextlib import contextmanager
 import os
+import shutil
 import sys
-from typing import Any, Tuple, Dict
+import textwrap
+from typing import Any, Callable, Dict, Tuple
 import unittest
 
+from pyspark import SparkConf, SparkContext
 from pyspark.ml.deepspeed.deepspeed_distributor import 
DeepspeedTorchDistributor
+from pyspark.sql import SparkSession
+from pyspark.ml.torch.tests.test_distributor import (
+get_local_mode_conf,
+set_up_test_dirs,
+get_distributed_mode_conf,
+)
+
+have_deepspeed = True
+try:
+import deepspeed  # noqa: F401
+except ImportError:
+have_deepspeed = False
 
 
 class DeepspeedTorchDistributorUnitTests(unittest.TestCase):
@@ -164,6 +180,120 @@ class 
DeepspeedTorchDistributorUnitTests(unittest.TestCase):
 self.assertEqual(distributed_cmd_args_expected, 
distributed_command_with_args)
 
 
+def _create_basic_function() -> Callable:
+# TODO: swap out with better test function
+# once Deepspeed better supports CPU
+def pythagoras(leg1: float, leg2: float) -> float:
+import deepspeed
+
+print(deepspeed.__version__)
+return (leg1 * leg1 + leg2 * leg2) ** 0.5
+
+return pythagoras
+
+
+@contextmanager
+def _create_pytorch_training_test_file():
+# Note: when Deepspeed CPU support becomes better,
+# switch in more realistic training files using Deepspeed
+# optimizations + constructs
+str_to_write = textwrap.dedent(
+""" 
+import sys
+ 

[spark] branch master updated: [SPARK-44264][PYTHON] E2E Testing for Deepspeed

2023-07-27 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 9ae33b82d78 [SPARK-44264][PYTHON] E2E Testing for Deepspeed
9ae33b82d78 is described below

commit 9ae33b82d78d65f58acb253c0b710b5807f9912d
Author: Mathew Jacob 
AuthorDate: Fri Jul 28 09:14:45 2023 +0900

[SPARK-44264][PYTHON] E2E Testing for Deepspeed

### What Changes Were Proposed
This PR adds some end to end tests for the DeepspeedTorchDistributor. Due 
to the lack of support currently available to deepspeed, we use proxy functions 
that are very simple to test that the command works. For the future, once 
Deepspeed supports more CPU, these end to end tests should instead migrate to 
more actual Deepspeed workloads, such as those described in the PR comments.

### Why Do We Need These Changes
Previously, we only had unit tests for helper functions. These test actual 
workloads that a user may use DeepspeedTorchDistributor from end to end.

### Any User Facing Changes
No, these are end to end tests.

### How Was This Tested
Running the tests and seeing if they pass.

Closes #42118 from mathewjacob1002/gpu_e2e_tests.

Authored-by: Mathew Jacob 
Signed-off-by: Hyukjin Kwon 
---
 dev/requirements.txt   |   3 +
 dev/tox.ini|   1 +
 python/mypy.ini|   3 +
 .../deepspeed/tests/test_deepspeed_distributor.py  | 134 -
 4 files changed, 139 insertions(+), 2 deletions(-)

diff --git a/dev/requirements.txt b/dev/requirements.txt
index f5fe5fa071f..38a9b244710 100644
--- a/dev/requirements.txt
+++ b/dev/requirements.txt
@@ -65,3 +65,6 @@ torch
 torchvision
 torcheval
 
+# DeepspeedTorchDistributor dependencies
+deepspeed
+
diff --git a/dev/tox.ini b/dev/tox.ini
index e2a77786ed4..438f82fec1e 100644
--- a/dev/tox.ini
+++ b/dev/tox.ini
@@ -46,6 +46,7 @@ exclude =
 */target/*,
 docs/.local_ruby_bundle/,
 *python/pyspark/cloudpickle/*.py,
+*python/pyspark/ml/deepspeed/tests/*.py
 *python/docs/build/*,
 *python/docs/source/conf.py,
 *python/.eggs/*,
diff --git a/python/mypy.ini b/python/mypy.ini
index a845cd88bd8..4d1fc3ceb66 100644
--- a/python/mypy.ini
+++ b/python/mypy.ini
@@ -88,6 +88,9 @@ ignore_errors = True
 [mypy-pyspark.ml.torch.tests.*]
 ignore_errors = True
 
+[mypy-pyspark.ml.deepspeed.tests.*]
+ignore_errors = True
+
 [mypy-pyspark.mllib.tests.*]
 ignore_errors = True
 
diff --git a/python/pyspark/ml/deepspeed/tests/test_deepspeed_distributor.py 
b/python/pyspark/ml/deepspeed/tests/test_deepspeed_distributor.py
index 4c4606699a3..590e541c384 100644
--- a/python/pyspark/ml/deepspeed/tests/test_deepspeed_distributor.py
+++ b/python/pyspark/ml/deepspeed/tests/test_deepspeed_distributor.py
@@ -1,4 +1,4 @@
-#
+# mypy: ignore-errors
 # Licensed to the Apache Software Foundation (ASF) under one or more
 # contributor license agreements.  See the NOTICE file distributed with
 # this work for additional information regarding copyright ownership.
@@ -14,12 +14,28 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
+from contextlib import contextmanager
 import os
+import shutil
 import sys
-from typing import Any, Tuple, Dict
+import textwrap
+from typing import Any, Callable, Dict, Tuple
 import unittest
 
+from pyspark import SparkConf, SparkContext
 from pyspark.ml.deepspeed.deepspeed_distributor import 
DeepspeedTorchDistributor
+from pyspark.sql import SparkSession
+from pyspark.ml.torch.tests.test_distributor import (
+get_local_mode_conf,
+set_up_test_dirs,
+get_distributed_mode_conf,
+)
+
+have_deepspeed = True
+try:
+import deepspeed  # noqa: F401
+except ImportError:
+have_deepspeed = False
 
 
 class DeepspeedTorchDistributorUnitTests(unittest.TestCase):
@@ -164,6 +180,120 @@ class 
DeepspeedTorchDistributorUnitTests(unittest.TestCase):
 self.assertEqual(distributed_cmd_args_expected, 
distributed_command_with_args)
 
 
+def _create_basic_function() -> Callable:
+# TODO: swap out with better test function
+# once Deepspeed better supports CPU
+def pythagoras(leg1: float, leg2: float) -> float:
+import deepspeed
+
+print(deepspeed.__version__)
+return (leg1 * leg1 + leg2 * leg2) ** 0.5
+
+return pythagoras
+
+
+@contextmanager
+def _create_pytorch_training_test_file():
+# Note: when Deepspeed CPU support becomes better,
+# switch in more realistic training files using Deepspeed
+# optimizations + constructs
+str_to_write = textwrap.dedent(
+""" 
+import sys
+def pythagorean_thm(x : int, y: int): # type: ignore 
+import deepspeed # type: ignore
+   

[spark] branch master updated: [SPARK-44198][CORE] Support propagation of the log level to the executors

2023-07-27 Thread attilapiros
This is an automated email from the ASF dual-hosted git repository.

attilapiros pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 5fc90fbd4e3 [SPARK-44198][CORE] Support propagation of the log level 
to the executors
5fc90fbd4e3 is described below

commit 5fc90fbd4e3235fbcf038f4725037321b8234d94
Author: Vinod KC 
AuthorDate: Thu Jul 27 16:39:33 2023 -0700

[SPARK-44198][CORE] Support propagation of the log level to the executors

### What changes were proposed in this pull request?

Currently, the **sc.setLogLevel()** method only sets the log level on the 
Spark driver, failing to reflect the desired log level on the executors. With  
_--conf **spark.log.level**_  or **sc.setLogLevel()**, spark allows tuning the 
log level in the driver process, but it is not reflecting the log level on 
executors.

### Why are the changes needed?
This inconsistency can lead to difficulties in debugging and monitoring 
Spark applications, as log messages from the executors may not align with the 
expected log level set on the user code.
This PR aims to propagate the log level changes to executors when  
sc.setLogLevel()  is called or send the current log level when a new executor 
is getting registered

### Does this PR introduce _any_ user-facing change?
No, but with this PR, both driver and executor will show same log level

### How was this patch tested?

Tested manually to verify the same log levels on both driver and executor

Closes #41746 from vinodkc/br_support_setloglevel_executors.

Authored-by: Vinod KC 
Signed-off-by: attilapiros 
---
 .../main/scala/org/apache/spark/SparkContext.scala | 11 --
 .../executor/CoarseGrainedExecutorBackend.scala|  4 
 .../org/apache/spark/internal/config/package.scala |  8 +++
 .../apache/spark/scheduler/SchedulerBackend.scala  |  1 +
 .../cluster/CoarseGrainedClusterMessage.scala  |  7 +-
 .../cluster/CoarseGrainedSchedulerBackend.scala| 20 -
 .../main/scala/org/apache/spark/util/Utils.scala   | 25 +++---
 7 files changed, 69 insertions(+), 7 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 26fdb86d299..f48cb32b319 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -40,7 +40,6 @@ import org.apache.hadoop.io.{ArrayWritable, BooleanWritable, 
BytesWritable, Doub
 import org.apache.hadoop.mapred.{FileInputFormat, InputFormat, JobConf, 
SequenceFileInputFormat, TextInputFormat}
 import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat, Job => 
NewHadoopJob}
 import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => 
NewFileInputFormat}
-import org.apache.logging.log4j.Level
 
 import org.apache.spark.annotation.{DeveloperApi, Experimental}
 import org.apache.spark.broadcast.Broadcast
@@ -396,7 +395,10 @@ class SparkContext(config: SparkConf) extends Logging {
 require(SparkContext.VALID_LOG_LEVELS.contains(upperCased),
   s"Supplied level $logLevel did not match one of:" +
 s" ${SparkContext.VALID_LOG_LEVELS.mkString(",")}")
-Utils.setLogLevel(Level.toLevel(upperCased))
+Utils.setLogLevelIfNeeded(upperCased)
+if (conf.get(EXECUTOR_ALLOW_SYNC_LOG_LEVEL) && _schedulerBackend != null) {
+  _schedulerBackend.updateExecutorsLogLevel(upperCased)
+}
   }
 
   try {
@@ -585,6 +587,11 @@ class SparkContext(config: SparkConf) extends Logging {
 _dagScheduler = new DAGScheduler(this)
 _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)
 
+if (_conf.get(EXECUTOR_ALLOW_SYNC_LOG_LEVEL)) {
+  _conf.get(SPARK_LOG_LEVEL)
+.foreach(logLevel => 
_schedulerBackend.updateExecutorsLogLevel(logLevel))
+}
+
 val _executorMetricsSource =
   if (_conf.get(METRICS_EXECUTORMETRICS_SOURCE_ENABLED)) {
 Some(new ExecutorMetricsSource)
diff --git 
a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
 
b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index ab238626efe..da009f5addb 100644
--- 
a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -177,6 +177,8 @@ private[spark] class CoarseGrainedExecutorBackend(
 case NonFatal(e) =>
   exitExecutor(1, "Unable to create executor due to " + e.getMessage, 
e)
   }
+case UpdateExecutorLogLevel(newLogLevel) =>
+  Utils.setLogLevelIfNeeded(newLogLevel)
 
 case LaunchTask(data) =>
   if (executor == null) {
@@ -473,6 +475,8 @@ private[spark] object CoarseGrainedExecutorBackend extends 
Logging {
   }
 
   

[spark] branch branch-3.5 updated: [SPARK-44425][CONNECT] Validate that user provided sessionId is an UUID

2023-07-27 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
 new cbcd85c5e57 [SPARK-44425][CONNECT] Validate that user provided 
sessionId is an UUID
cbcd85c5e57 is described below

commit cbcd85c5e57695f3992eaf694d61be86f84449c3
Author: Juliusz Sompolski 
AuthorDate: Fri Jul 28 07:55:02 2023 +0900

[SPARK-44425][CONNECT] Validate that user provided sessionId is an UUID

We want to validate that user provided sessionId is an UUID. Existing Spark 
Connect python and scala clients already do that, we would like to depend on it 
being in this format moving forward, just like we already validate that 
operatoinId is an UUID.

Validate what's already assumed.

No.

Existing CI.

Closes #42150 from juliuszsompolski/SPARK-44425.

Authored-by: Juliusz Sompolski 
Signed-off-by: Hyukjin Kwon 
(cherry picked from commit a3bd477a6d8c317ee1e9a6aae6ebd2ef4fc67cce)
Signed-off-by: Hyukjin Kwon 
---
 .../src/main/protobuf/spark/connect/base.proto |  6 ++
 .../sql/connect/service/SparkConnectService.scala  | 12 ++-
 .../connect/artifact/ArtifactManagerSuite.scala| 15 +-
 .../connect/planner/SparkConnectServiceSuite.scala | 23 +-
 .../connect/service/AddArtifactsHandlerSuite.scala | 13 +++-
 python/pyspark/sql/connect/proto/base_pb2.pyi  |  6 ++
 6 files changed, 55 insertions(+), 20 deletions(-)

diff --git 
a/connector/connect/common/src/main/protobuf/spark/connect/base.proto 
b/connector/connect/common/src/main/protobuf/spark/connect/base.proto
index d935ae65328..21fd167f6b5 100644
--- a/connector/connect/common/src/main/protobuf/spark/connect/base.proto
+++ b/connector/connect/common/src/main/protobuf/spark/connect/base.proto
@@ -63,6 +63,7 @@ message AnalyzePlanRequest {
   // The session_id specifies a spark session for a user id (which is specified
   // by user_context.user_id). The session_id is set by the client to be able 
to
   // collate streaming responses from different queries within the dedicated 
session.
+  // The id should be an UUID string of the format 
`00112233-4455-6677-8899-aabbccddeeff`
   string session_id = 1;
 
   // (Required) User context
@@ -273,6 +274,7 @@ message ExecutePlanRequest {
   // The session_id specifies a spark session for a user id (which is specified
   // by user_context.user_id). The session_id is set by the client to be able 
to
   // collate streaming responses from different queries within the dedicated 
session.
+  // The id should be an UUID string of the format 
`00112233-4455-6677-8899-aabbccddeeff`
   string session_id = 1;
 
   // (Required) User context
@@ -407,6 +409,7 @@ message ConfigRequest {
   // The session_id specifies a spark session for a user id (which is specified
   // by user_context.user_id). The session_id is set by the client to be able 
to
   // collate streaming responses from different queries within the dedicated 
session.
+  // The id should be an UUID string of the format 
`00112233-4455-6677-8899-aabbccddeeff`
   string session_id = 1;
 
   // (Required) User context
@@ -492,6 +495,7 @@ message AddArtifactsRequest {
   // The session_id specifies a spark session for a user id (which is specified
   // by user_context.user_id). The session_id is set by the client to be able 
to
   // collate streaming responses from different queries within the dedicated 
session.
+  // The id should be an UUID string of the format 
`00112233-4455-6677-8899-aabbccddeeff`
   string session_id = 1;
 
   // User context
@@ -581,6 +585,7 @@ message ArtifactStatusesRequest {
   // The session_id specifies a spark session for a user id (which is specified
   // by user_context.user_id). The session_id is set by the client to be able 
to
   // collate streaming responses from different queries within the dedicated 
session.
+  // The id should be an UUID string of the format 
`00112233-4455-6677-8899-aabbccddeeff`
   string session_id = 1;
 
   // User context
@@ -617,6 +622,7 @@ message InterruptRequest {
   // The session_id specifies a spark session for a user id (which is specified
   // by user_context.user_id). The session_id is set by the client to be able 
to
   // collate streaming responses from different queries within the dedicated 
session.
+  // The id should be an UUID string of the format 
`00112233-4455-6677-8899-aabbccddeeff`
   string session_id = 1;
 
   // (Required) User context
diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala
index ad40c94d549..c8fbfca6f70 100644
--- 

[spark] branch master updated: [SPARK-44425][CONNECT] Validate that user provided sessionId is an UUID

2023-07-27 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new a3bd477a6d8 [SPARK-44425][CONNECT] Validate that user provided 
sessionId is an UUID
a3bd477a6d8 is described below

commit a3bd477a6d8c317ee1e9a6aae6ebd2ef4fc67cce
Author: Juliusz Sompolski 
AuthorDate: Fri Jul 28 07:55:02 2023 +0900

[SPARK-44425][CONNECT] Validate that user provided sessionId is an UUID

### What changes were proposed in this pull request?

We want to validate that user provided sessionId is an UUID. Existing Spark 
Connect python and scala clients already do that, we would like to depend on it 
being in this format moving forward, just like we already validate that 
operatoinId is an UUID.

### Why are the changes needed?

Validate what's already assumed.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing CI.

Closes #42150 from juliuszsompolski/SPARK-44425.

Authored-by: Juliusz Sompolski 
Signed-off-by: Hyukjin Kwon 
---
 .../src/main/protobuf/spark/connect/base.proto |  6 ++
 .../sql/connect/service/SparkConnectService.scala  | 12 ++-
 .../connect/artifact/ArtifactManagerSuite.scala| 15 +-
 .../connect/planner/SparkConnectServiceSuite.scala | 23 +-
 .../connect/service/AddArtifactsHandlerSuite.scala | 13 +++-
 python/pyspark/sql/connect/proto/base_pb2.pyi  |  6 ++
 6 files changed, 55 insertions(+), 20 deletions(-)

diff --git 
a/connector/connect/common/src/main/protobuf/spark/connect/base.proto 
b/connector/connect/common/src/main/protobuf/spark/connect/base.proto
index d935ae65328..21fd167f6b5 100644
--- a/connector/connect/common/src/main/protobuf/spark/connect/base.proto
+++ b/connector/connect/common/src/main/protobuf/spark/connect/base.proto
@@ -63,6 +63,7 @@ message AnalyzePlanRequest {
   // The session_id specifies a spark session for a user id (which is specified
   // by user_context.user_id). The session_id is set by the client to be able 
to
   // collate streaming responses from different queries within the dedicated 
session.
+  // The id should be an UUID string of the format 
`00112233-4455-6677-8899-aabbccddeeff`
   string session_id = 1;
 
   // (Required) User context
@@ -273,6 +274,7 @@ message ExecutePlanRequest {
   // The session_id specifies a spark session for a user id (which is specified
   // by user_context.user_id). The session_id is set by the client to be able 
to
   // collate streaming responses from different queries within the dedicated 
session.
+  // The id should be an UUID string of the format 
`00112233-4455-6677-8899-aabbccddeeff`
   string session_id = 1;
 
   // (Required) User context
@@ -407,6 +409,7 @@ message ConfigRequest {
   // The session_id specifies a spark session for a user id (which is specified
   // by user_context.user_id). The session_id is set by the client to be able 
to
   // collate streaming responses from different queries within the dedicated 
session.
+  // The id should be an UUID string of the format 
`00112233-4455-6677-8899-aabbccddeeff`
   string session_id = 1;
 
   // (Required) User context
@@ -492,6 +495,7 @@ message AddArtifactsRequest {
   // The session_id specifies a spark session for a user id (which is specified
   // by user_context.user_id). The session_id is set by the client to be able 
to
   // collate streaming responses from different queries within the dedicated 
session.
+  // The id should be an UUID string of the format 
`00112233-4455-6677-8899-aabbccddeeff`
   string session_id = 1;
 
   // User context
@@ -581,6 +585,7 @@ message ArtifactStatusesRequest {
   // The session_id specifies a spark session for a user id (which is specified
   // by user_context.user_id). The session_id is set by the client to be able 
to
   // collate streaming responses from different queries within the dedicated 
session.
+  // The id should be an UUID string of the format 
`00112233-4455-6677-8899-aabbccddeeff`
   string session_id = 1;
 
   // User context
@@ -617,6 +622,7 @@ message InterruptRequest {
   // The session_id specifies a spark session for a user id (which is specified
   // by user_context.user_id). The session_id is set by the client to be able 
to
   // collate streaming responses from different queries within the dedicated 
session.
+  // The id should be an UUID string of the format 
`00112233-4455-6677-8899-aabbccddeeff`
   string session_id = 1;
 
   // (Required) User context
diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala
index 6b7007130be..87e4f21732f 

[spark] branch branch-3.5 updated: [SPARK-44560][PYTHON][CONNECT] Improve tests and documentation for Arrow Python UDF

2023-07-27 Thread xinrong
This is an automated email from the ASF dual-hosted git repository.

xinrong pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
 new 36b93d07eb9 [SPARK-44560][PYTHON][CONNECT] Improve tests and 
documentation for Arrow Python UDF
36b93d07eb9 is described below

commit 36b93d07eb961905647c42fac80e22efdfb15f4f
Author: Xinrong Meng 
AuthorDate: Thu Jul 27 13:45:05 2023 -0700

[SPARK-44560][PYTHON][CONNECT] Improve tests and documentation for Arrow 
Python UDF

### What changes were proposed in this pull request?

- Test on complex return type
- Remove complex return type constraints for Arrow Python UDF on Spark 
Connect
- Update documentation of the related Spark conf

The change targets both Spark 3.5 and 4.0.

### Why are the changes needed?
Testability and parity with vanilla PySpark.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Unit tests.

Closes #42178 from xinrong-meng/conf.

Authored-by: Xinrong Meng 
Signed-off-by: Xinrong Meng 
(cherry picked from commit 5f6537409383e2dbdd699108f708567c37db8151)
Signed-off-by: Xinrong Meng 
---
 python/pyspark/sql/connect/udf.py| 10 ++
 python/pyspark/sql/tests/test_arrow_python_udf.py|  5 -
 python/pyspark/sql/tests/test_udf.py | 16 
 .../scala/org/apache/spark/sql/internal/SQLConf.scala|  3 +--
 4 files changed, 19 insertions(+), 15 deletions(-)

diff --git a/python/pyspark/sql/connect/udf.py 
b/python/pyspark/sql/connect/udf.py
index 0a5d06618b3..2d7e423d3d5 100644
--- a/python/pyspark/sql/connect/udf.py
+++ b/python/pyspark/sql/connect/udf.py
@@ -35,7 +35,7 @@ from pyspark.sql.connect.expressions import (
 )
 from pyspark.sql.connect.column import Column
 from pyspark.sql.connect.types import UnparsedDataType
-from pyspark.sql.types import ArrayType, DataType, MapType, StringType, 
StructType
+from pyspark.sql.types import DataType, StringType
 from pyspark.sql.udf import UDFRegistration as PySparkUDFRegistration
 from pyspark.errors import PySparkTypeError
 
@@ -70,18 +70,12 @@ def _create_py_udf(
 is_arrow_enabled = useArrow
 
 regular_udf = _create_udf(f, returnType, PythonEvalType.SQL_BATCHED_UDF)
-return_type = regular_udf.returnType
 try:
 is_func_with_args = len(getfullargspec(f).args) > 0
 except TypeError:
 is_func_with_args = False
-is_output_atomic_type = (
-not isinstance(return_type, StructType)
-and not isinstance(return_type, MapType)
-and not isinstance(return_type, ArrayType)
-)
 if is_arrow_enabled:
-if is_output_atomic_type and is_func_with_args:
+if is_func_with_args:
 return _create_arrow_py_udf(regular_udf)
 else:
 warnings.warn(
diff --git a/python/pyspark/sql/tests/test_arrow_python_udf.py 
b/python/pyspark/sql/tests/test_arrow_python_udf.py
index 264ea0b901f..f48f07666e1 100644
--- a/python/pyspark/sql/tests/test_arrow_python_udf.py
+++ b/python/pyspark/sql/tests/test_arrow_python_udf.py
@@ -47,11 +47,6 @@ class PythonUDFArrowTestsMixin(BaseUDFTestsMixin):
 def test_register_java_udaf(self):
 super(PythonUDFArrowTests, self).test_register_java_udaf()
 
-# TODO(SPARK-43903): Standardize ArrayType conversion for Python UDF
-@unittest.skip("Inconsistent ArrayType conversion with/without Arrow.")
-def test_nested_array(self):
-super(PythonUDFArrowTests, self).test_nested_array()
-
 def test_complex_input_types(self):
 row = (
 self.spark.range(1)
diff --git a/python/pyspark/sql/tests/test_udf.py 
b/python/pyspark/sql/tests/test_udf.py
index 8ffcb5e05a2..239ff27813b 100644
--- a/python/pyspark/sql/tests/test_udf.py
+++ b/python/pyspark/sql/tests/test_udf.py
@@ -882,6 +882,22 @@ class BaseUDFTestsMixin(object):
 row = df.select(f("nested_array")).first()
 self.assertEquals(row[0], [[1, 2], [3, 4], [4, 5]])
 
+def test_complex_return_types(self):
+row = (
+self.spark.range(1)
+.selectExpr("array(1, 2, 3) as array", "map('a', 'b') as map", 
"struct(1, 2) as struct")
+.select(
+udf(lambda x: x, "array")("array"),
+udf(lambda x: x, "map")("map"),
+udf(lambda x: x, "struct")("struct"),
+)
+.first()
+)
+
+self.assertEquals(row[0], [1, 2, 3])
+self.assertEquals(row[1], {"a": "b"})
+self.assertEquals(row[2], Row(col1=1, col2=2))
+
 
 class UDFTests(BaseUDFTestsMixin, ReusedSQLTestCase):
 @classmethod
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 

[spark] branch master updated: [SPARK-44560][PYTHON][CONNECT] Improve tests and documentation for Arrow Python UDF

2023-07-27 Thread xinrong
This is an automated email from the ASF dual-hosted git repository.

xinrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 5f653740938 [SPARK-44560][PYTHON][CONNECT] Improve tests and 
documentation for Arrow Python UDF
5f653740938 is described below

commit 5f6537409383e2dbdd699108f708567c37db8151
Author: Xinrong Meng 
AuthorDate: Thu Jul 27 13:45:05 2023 -0700

[SPARK-44560][PYTHON][CONNECT] Improve tests and documentation for Arrow 
Python UDF

### What changes were proposed in this pull request?

- Test on complex return type
- Remove complex return type constraints for Arrow Python UDF on Spark 
Connect
- Update documentation of the related Spark conf

The change targets both Spark 3.5 and 4.0.

### Why are the changes needed?
Testability and parity with vanilla PySpark.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Unit tests.

Closes #42178 from xinrong-meng/conf.

Authored-by: Xinrong Meng 
Signed-off-by: Xinrong Meng 
---
 python/pyspark/sql/connect/udf.py| 10 ++
 python/pyspark/sql/tests/test_arrow_python_udf.py|  5 -
 python/pyspark/sql/tests/test_udf.py | 16 
 .../scala/org/apache/spark/sql/internal/SQLConf.scala|  3 +--
 4 files changed, 19 insertions(+), 15 deletions(-)

diff --git a/python/pyspark/sql/connect/udf.py 
b/python/pyspark/sql/connect/udf.py
index 0a5d06618b3..2d7e423d3d5 100644
--- a/python/pyspark/sql/connect/udf.py
+++ b/python/pyspark/sql/connect/udf.py
@@ -35,7 +35,7 @@ from pyspark.sql.connect.expressions import (
 )
 from pyspark.sql.connect.column import Column
 from pyspark.sql.connect.types import UnparsedDataType
-from pyspark.sql.types import ArrayType, DataType, MapType, StringType, 
StructType
+from pyspark.sql.types import DataType, StringType
 from pyspark.sql.udf import UDFRegistration as PySparkUDFRegistration
 from pyspark.errors import PySparkTypeError
 
@@ -70,18 +70,12 @@ def _create_py_udf(
 is_arrow_enabled = useArrow
 
 regular_udf = _create_udf(f, returnType, PythonEvalType.SQL_BATCHED_UDF)
-return_type = regular_udf.returnType
 try:
 is_func_with_args = len(getfullargspec(f).args) > 0
 except TypeError:
 is_func_with_args = False
-is_output_atomic_type = (
-not isinstance(return_type, StructType)
-and not isinstance(return_type, MapType)
-and not isinstance(return_type, ArrayType)
-)
 if is_arrow_enabled:
-if is_output_atomic_type and is_func_with_args:
+if is_func_with_args:
 return _create_arrow_py_udf(regular_udf)
 else:
 warnings.warn(
diff --git a/python/pyspark/sql/tests/test_arrow_python_udf.py 
b/python/pyspark/sql/tests/test_arrow_python_udf.py
index 264ea0b901f..f48f07666e1 100644
--- a/python/pyspark/sql/tests/test_arrow_python_udf.py
+++ b/python/pyspark/sql/tests/test_arrow_python_udf.py
@@ -47,11 +47,6 @@ class PythonUDFArrowTestsMixin(BaseUDFTestsMixin):
 def test_register_java_udaf(self):
 super(PythonUDFArrowTests, self).test_register_java_udaf()
 
-# TODO(SPARK-43903): Standardize ArrayType conversion for Python UDF
-@unittest.skip("Inconsistent ArrayType conversion with/without Arrow.")
-def test_nested_array(self):
-super(PythonUDFArrowTests, self).test_nested_array()
-
 def test_complex_input_types(self):
 row = (
 self.spark.range(1)
diff --git a/python/pyspark/sql/tests/test_udf.py 
b/python/pyspark/sql/tests/test_udf.py
index 8ffcb5e05a2..239ff27813b 100644
--- a/python/pyspark/sql/tests/test_udf.py
+++ b/python/pyspark/sql/tests/test_udf.py
@@ -882,6 +882,22 @@ class BaseUDFTestsMixin(object):
 row = df.select(f("nested_array")).first()
 self.assertEquals(row[0], [[1, 2], [3, 4], [4, 5]])
 
+def test_complex_return_types(self):
+row = (
+self.spark.range(1)
+.selectExpr("array(1, 2, 3) as array", "map('a', 'b') as map", 
"struct(1, 2) as struct")
+.select(
+udf(lambda x: x, "array")("array"),
+udf(lambda x: x, "map")("map"),
+udf(lambda x: x, "struct")("struct"),
+)
+.first()
+)
+
+self.assertEquals(row[0], [1, 2, 3])
+self.assertEquals(row[1], {"a": "b"})
+self.assertEquals(row[2], Row(col1=1, col2=2))
+
 
 class UDFTests(BaseUDFTestsMixin, ReusedSQLTestCase):
 @classmethod
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index af2ec777d6b..2674ae7f4a6 100644
--- 

[spark] branch master updated: [SPARK-43968][PYTHON] Improve error messages for Python UDTFs with wrong number of outputs

2023-07-27 Thread ueshin
This is an automated email from the ASF dual-hosted git repository.

ueshin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 7194ce9263f [SPARK-43968][PYTHON] Improve error messages for Python 
UDTFs with wrong number of outputs
7194ce9263f is described below

commit 7194ce9263fe1683c039a1aaf9462657b1672a99
Author: allisonwang-db 
AuthorDate: Thu Jul 27 13:18:39 2023 -0700

[SPARK-43968][PYTHON] Improve error messages for Python UDTFs with wrong 
number of outputs

### What changes were proposed in this pull request?

This PR improves the error messages for Python UDTFs when the number of 
outputs mismatches the number of outputs specified in the return type of the 
UDTFs.

### Why are the changes needed?

To make Python UDTFs more user-friendly.

### Does this PR introduce _any_ user-facing change?

Yes. This PR improves the error messages.
Before this change, the error thrown by Spark will be a java 
IllegalStateException:
```
java.lang.IllegalStateException: Input row doesn't have expected number of 
values required by the schema
```
After this PR, it will throw a clearer error message with an error class:
```
[UDTF_RETURN_SCHEMA_MISMATCH] The number of columns in the result does not 
match the specified schema
```

### How was this patch tested?

Existing tests and new unit tests.

Closes #42157 from allisonwang-db/spark-43968-py-udtf-checks.

Authored-by: allisonwang-db 
Signed-off-by: Takuya UESHIN 
---
 python/pyspark/errors/error_classes.py |   5 +
 python/pyspark/sql/connect/udtf.py |   4 +-
 .../pyspark/sql/tests/connect/test_parity_udtf.py  |  50 
 python/pyspark/sql/tests/test_udtf.py  | 133 +++--
 python/pyspark/sql/udtf.py |   9 +-
 python/pyspark/worker.py   |  22 +++-
 6 files changed, 99 insertions(+), 124 deletions(-)

diff --git a/python/pyspark/errors/error_classes.py 
b/python/pyspark/errors/error_classes.py
index e0d1c30b604..f4b643f1d32 100644
--- a/python/pyspark/errors/error_classes.py
+++ b/python/pyspark/errors/error_classes.py
@@ -283,6 +283,11 @@ ERROR_CLASSES_JSON = """
   "The eval type for the UDTF '' is invalid. It must be one of 
."
 ]
   },
+  "INVALID_UDTF_HANDLER_TYPE" : {
+"message" : [
+  "The UDTF is invalid. The function handler must be a class, but got 
''. Please provide a class as the function handler."
+]
+  },
   "INVALID_UDTF_NO_EVAL" : {
 "message" : [
   "The UDTF '' is invalid. It does not implement the required 'eval' 
method. Please implement the 'eval' method in '' and try again."
diff --git a/python/pyspark/sql/connect/udtf.py 
b/python/pyspark/sql/connect/udtf.py
index 74c55cc42cd..919994401c8 100644
--- a/python/pyspark/sql/connect/udtf.py
+++ b/python/pyspark/sql/connect/udtf.py
@@ -124,6 +124,8 @@ class UserDefinedTableFunction:
 evalType: int = PythonEvalType.SQL_TABLE_UDF,
 deterministic: bool = True,
 ) -> None:
+_validate_udtf_handler(func, returnType)
+
 self.func = func
 self.returnType: Optional[DataType] = (
 None
@@ -136,8 +138,6 @@ class UserDefinedTableFunction:
 self.evalType = evalType
 self.deterministic = deterministic
 
-_validate_udtf_handler(func, returnType)
-
 def _build_common_inline_user_defined_table_function(
 self, *cols: "ColumnOrName"
 ) -> CommonInlineUserDefinedTableFunction:
diff --git a/python/pyspark/sql/tests/connect/test_parity_udtf.py 
b/python/pyspark/sql/tests/connect/test_parity_udtf.py
index 1aff1bd0686..748b611e667 100644
--- a/python/pyspark/sql/tests/connect/test_parity_udtf.py
+++ b/python/pyspark/sql/tests/connect/test_parity_udtf.py
@@ -56,56 +56,6 @@ class UDTFParityTests(BaseUDTFTestsMixin, 
ReusedConnectTestCase):
 ):
 TestUDTF(lit(1)).collect()
 
-def test_udtf_with_wrong_num_output(self):
-err_msg = (
-"java.lang.IllegalStateException: Input row doesn't have expected 
number of "
-+ "values required by the schema."
-)
-
-@udtf(returnType="a: int, b: int")
-class TestUDTF:
-def eval(self, a: int):
-yield a,
-
-with self.assertRaisesRegex(SparkConnectGrpcException, err_msg):
-TestUDTF(lit(1)).collect()
-
-@udtf(returnType="a: int")
-class TestUDTF:
-def eval(self, a: int):
-yield a, a + 1
-
-with self.assertRaisesRegex(SparkConnectGrpcException, err_msg):
-TestUDTF(lit(1)).collect()
-
-def test_udtf_terminate_with_wrong_num_output(self):
-err_msg = (
-"java.lang.IllegalStateException: Input row doesn't have expected 

[spark] branch master updated: [MINOR][DOCS] fix: some minor typos

2023-07-27 Thread srowen
This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 921fb289f00 [MINOR][DOCS] fix: some minor typos
921fb289f00 is described below

commit 921fb289f003317d89120faa6937e4abd359195c
Author: Eric Blanco 
AuthorDate: Thu Jul 27 08:53:54 2023 -0500

[MINOR][DOCS] fix: some minor typos

### What changes were proposed in this pull request?
Change `the the` to `the`

### Why are the changes needed?
To fix the typo

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?

Closes #42188 from ejblanco/docs/spark-typos.

Authored-by: Eric Blanco 
Signed-off-by: Sean Owen 
---
 .../spark/sql/connect/service/SparkConnectStreamingQueryCache.scala | 2 +-
 .../org/apache/spark/ui/static/vis-timeline-graph2d.min.js.map  | 2 +-
 dev/connect-jvm-client-mima-check   | 2 +-
 .../main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala| 2 +-
 .../scala/org/apache/spark/sql/catalyst/expressions/WindowTime.scala| 2 +-
 sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala | 2 +-
 6 files changed, 6 insertions(+), 6 deletions(-)

diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamingQueryCache.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamingQueryCache.scala
index 133686df018..87004242da9 100644
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamingQueryCache.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamingQueryCache.scala
@@ -84,7 +84,7 @@ private[connect] class SparkConnectStreamingQueryCache(
 
   /**
* Returns [[StreamingQuery]] if it is cached and session matches the cached 
query. It ensures
-   * the the session associated with it matches the session passed into the 
call. If the query is
+   * the session associated with it matches the session passed into the call. 
If the query is
* inactive (i.e. it has a cache expiry time set), this access extends its 
expiry time. So if a
* client keeps accessing a query, it stays in the cache.
*/
diff --git 
a/core/src/main/resources/org/apache/spark/ui/static/vis-timeline-graph2d.min.js.map
 
b/core/src/main/resources/org/apache/spark/ui/static/vis-timeline-graph2d.min.js.map
index 95fdc523cf4..250b375e545 100644
--- 
a/core/src/main/resources/org/apache/spark/ui/static/vis-timeline-graph2d.min.js.map
+++ 
b/core/src/main/resources/org/apache/spark/ui/static/vis-timeline-graph2d.min.js.map
@@ -1 +1 @@
-{"version":3,"file":"vis-timeline-graph2d.min.js","sources":["../../node_modules/moment/locale/de.js","../../node_modules/moment/moment.js","../../node_modules/moment/locale/es.js","../../node_modules/moment/locale/fr.js","../../node_modules/moment/locale/it.js","../../node_modules/moment/locale/ja.js","../../node_modules/moment/locale/nl.js","../../node_modules/moment/locale/pl.js","../../node_modules/moment/locale/ru.js","../../node_modules/moment/locale/uk.js","../../node_modules/core
 [...]
\ No newline at end of file
+{"version":3,"file":"vis-timeline-graph2d.min.js","sources":["../../node_modules/moment/locale/de.js","../../node_modules/moment/moment.js","../../node_modules/moment/locale/es.js","../../node_modules/moment/locale/fr.js","../../node_modules/moment/locale/it.js","../../node_modules/moment/locale/ja.js","../../node_modules/moment/locale/nl.js","../../node_modules/moment/locale/pl.js","../../node_modules/moment/locale/ru.js","../../node_modules/moment/locale/uk.js","../../node_modules/core
 [...]
\ No newline at end of file
diff --git a/dev/connect-jvm-client-mima-check 
b/dev/connect-jvm-client-mima-check
index ac4b95935b9..6a29cbf08ce 100755
--- a/dev/connect-jvm-client-mima-check
+++ b/dev/connect-jvm-client-mima-check
@@ -52,7 +52,7 @@ echo "finish connect-client-jvm module mima check ..."
 
 RESULT_SIZE=$(wc -l .connect-mima-check-result | awk '{print $1}')
 
-# The the file has no content if check passed.
+# The file has no content if check passed.
 if [[ $RESULT_SIZE -eq "0" ]]; then
   ERRORS=""
 else
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 3ece74a4d18..92e550ea941 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -2623,7 +2623,7 @@ class Analyzer(override val catalogManager: 
CatalogManager) extends RuleExecutor
   withOrigin(t.origin)(t.copy(hasTried = 

[spark] branch branch-3.5 updated: [SPARK-44505][SQL] Provide override for columnar support in Scan for DSv2

2023-07-27 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
 new 48494e1fa49 [SPARK-44505][SQL] Provide override for columnar support 
in Scan for DSv2
48494e1fa49 is described below

commit 48494e1fa49e93239a3fb240e7fdbce015f5cc0c
Author: Martin Grund 
AuthorDate: Thu Jul 27 20:53:43 2023 +0800

[SPARK-44505][SQL] Provide override for columnar support in Scan for DSv2

### What changes were proposed in this pull request?
Previously, when a new DSv2 data source is implemented during planning, it 
will always call `BatchScanExec:supportsColumnar` which will in turn iterate 
over all input partitions to check if they support columnar or not.

When the `planInputPartitions` method is expensive this can be problematic. 
This patch adds an option to the Scan interface that allows specifying a 
default value. For backward compatibility the default value provided by the 
Scan interface is partition defined, but a Scan can change it accordingly.

To fully support the changes of this PR, the following additional changes 
had to be done:

* `DataSourceV2ScanExecBase::outputPartitioning` removed the case for 
single partitions.
* `lazyval DataSourceV2ScanExecBase::groupedPartitions` added a special 
check for empty key group partitioning so that the simple case does not trigger 
a materialization of the input partitions during planning.

Additionally:
* Fixes similar issues as https://github.com/apache/spark/pull/40004

### Why are the changes needed?
Avoid costly operations during explain operations.

### Does this PR introduce _any_ user-facing change?
Np

### How was this patch tested?
Added new UT.

Closes #42099 from grundprinzip/SPARK-44505.

Authored-by: Martin Grund 
Signed-off-by: Wenchen Fan 
(cherry picked from commit 01191c83f8c77f5dcc85b9017551023d81ed0d45)
Signed-off-by: Wenchen Fan 
---
 .../org/apache/spark/sql/connector/read/Scan.java  | 24 
 .../datasources/v2/DataSourceV2ScanExecBase.scala  | 44 +-
 .../spark/sql/connector/DataSourceV2Suite.scala| 37 ++
 .../connector/KeyGroupedPartitioningSuite.scala|  5 ++-
 .../command/AlignAssignmentsSuiteBase.scala|  4 +-
 5 files changed, 93 insertions(+), 21 deletions(-)

diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/Scan.java 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/Scan.java
index 8f79c656210..969a47be707 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/Scan.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/Scan.java
@@ -125,4 +125,28 @@ public interface Scan {
   default CustomTaskMetric[] reportDriverMetrics() {
 return new CustomTaskMetric[]{};
   }
+
+  /**
+   * This enum defines how the columnar support for the partitions of the data 
source
+   * should be determined. The default value is `PARTITION_DEFINED` which 
indicates that each
+   * partition can determine if it should be columnar or not. SUPPORTED and 
UNSUPPORTED provide
+   * default shortcuts to indicate support for columnar data or not.
+   *
+   * @since 3.5.0
+   */
+  enum ColumnarSupportMode {
+PARTITION_DEFINED,
+SUPPORTED,
+UNSUPPORTED
+  }
+
+  /**
+   * Subclasses can implement this method to indicate if the support for 
columnar data should
+   * be determined by each partition or is set as a default for the whole scan.
+   *
+   * @since 3.5.0
+   */
+  default ColumnarSupportMode columnarSupportMode() {
+return ColumnarSupportMode.PARTITION_DEFINED;
+  }
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala
index e539b1c4ee3..f688d3514d9 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala
@@ -21,7 +21,7 @@ import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.{Expression, RowOrdering, 
SortOrder}
 import org.apache.spark.sql.catalyst.plans.physical
-import org.apache.spark.sql.catalyst.plans.physical.{KeyGroupedPartitioning, 
SinglePartition}
+import org.apache.spark.sql.catalyst.plans.physical.KeyGroupedPartitioning
 import org.apache.spark.sql.catalyst.util.{truncatedString, 
InternalRowComparableWrapper}
 import org.apache.spark.sql.connector.read.{HasPartitionKey, InputPartition, 
PartitionReaderFactory, Scan}
 import org.apache.spark.sql.execution.{ExplainUtils, 

[spark] branch master updated: [SPARK-44505][SQL] Provide override for columnar support in Scan for DSv2

2023-07-27 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 01191c83f8c [SPARK-44505][SQL] Provide override for columnar support 
in Scan for DSv2
01191c83f8c is described below

commit 01191c83f8c77f5dcc85b9017551023d81ed0d45
Author: Martin Grund 
AuthorDate: Thu Jul 27 20:53:43 2023 +0800

[SPARK-44505][SQL] Provide override for columnar support in Scan for DSv2

### What changes were proposed in this pull request?
Previously, when a new DSv2 data source is implemented during planning, it 
will always call `BatchScanExec:supportsColumnar` which will in turn iterate 
over all input partitions to check if they support columnar or not.

When the `planInputPartitions` method is expensive this can be problematic. 
This patch adds an option to the Scan interface that allows specifying a 
default value. For backward compatibility the default value provided by the 
Scan interface is partition defined, but a Scan can change it accordingly.

To fully support the changes of this PR, the following additional changes 
had to be done:

* `DataSourceV2ScanExecBase::outputPartitioning` removed the case for 
single partitions.
* `lazyval DataSourceV2ScanExecBase::groupedPartitions` added a special 
check for empty key group partitioning so that the simple case does not trigger 
a materialization of the input partitions during planning.

Additionally:
* Fixes similar issues as https://github.com/apache/spark/pull/40004

### Why are the changes needed?
Avoid costly operations during explain operations.

### Does this PR introduce _any_ user-facing change?
Np

### How was this patch tested?
Added new UT.

Closes #42099 from grundprinzip/SPARK-44505.

Authored-by: Martin Grund 
Signed-off-by: Wenchen Fan 
---
 .../org/apache/spark/sql/connector/read/Scan.java  | 24 
 .../datasources/v2/DataSourceV2ScanExecBase.scala  | 44 +-
 .../spark/sql/connector/DataSourceV2Suite.scala| 37 ++
 .../connector/KeyGroupedPartitioningSuite.scala|  5 ++-
 .../command/AlignAssignmentsSuiteBase.scala|  4 +-
 5 files changed, 93 insertions(+), 21 deletions(-)

diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/Scan.java 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/Scan.java
index 8f79c656210..969a47be707 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/Scan.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/Scan.java
@@ -125,4 +125,28 @@ public interface Scan {
   default CustomTaskMetric[] reportDriverMetrics() {
 return new CustomTaskMetric[]{};
   }
+
+  /**
+   * This enum defines how the columnar support for the partitions of the data 
source
+   * should be determined. The default value is `PARTITION_DEFINED` which 
indicates that each
+   * partition can determine if it should be columnar or not. SUPPORTED and 
UNSUPPORTED provide
+   * default shortcuts to indicate support for columnar data or not.
+   *
+   * @since 3.5.0
+   */
+  enum ColumnarSupportMode {
+PARTITION_DEFINED,
+SUPPORTED,
+UNSUPPORTED
+  }
+
+  /**
+   * Subclasses can implement this method to indicate if the support for 
columnar data should
+   * be determined by each partition or is set as a default for the whole scan.
+   *
+   * @since 3.5.0
+   */
+  default ColumnarSupportMode columnarSupportMode() {
+return ColumnarSupportMode.PARTITION_DEFINED;
+  }
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala
index e539b1c4ee3..f688d3514d9 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala
@@ -21,7 +21,7 @@ import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.{Expression, RowOrdering, 
SortOrder}
 import org.apache.spark.sql.catalyst.plans.physical
-import org.apache.spark.sql.catalyst.plans.physical.{KeyGroupedPartitioning, 
SinglePartition}
+import org.apache.spark.sql.catalyst.plans.physical.KeyGroupedPartitioning
 import org.apache.spark.sql.catalyst.util.{truncatedString, 
InternalRowComparableWrapper}
 import org.apache.spark.sql.connector.read.{HasPartitionKey, InputPartition, 
PartitionReaderFactory, Scan}
 import org.apache.spark.sql.execution.{ExplainUtils, LeafExecNode, 
SQLExecution}
@@ -91,22 +91,25 @@ trait DataSourceV2ScanExecBase extends LeafExecNode {
   }
 
   

[spark] branch master updated: [SPARK-44454][SQL][HIVE] HiveShim getTablesByType support fallback

2023-07-27 Thread yumwang
This is an automated email from the ASF dual-hosted git repository.

yumwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 50b36d13132 [SPARK-44454][SQL][HIVE] HiveShim getTablesByType support 
fallback
50b36d13132 is described below

commit 50b36d131326d1dc2503f20a35891e7f1b4b0194
Author: sychen 
AuthorDate: Thu Jul 27 18:23:31 2023 +0800

[SPARK-44454][SQL][HIVE] HiveShim getTablesByType support fallback

### What changes were proposed in this pull request?
When `Shim_v2_3#getTablesByType` call returns no `get_tables_by_type` 
method, throw `SparkUnsupportedOperationException`.
`HiveClientImpl#listTablesByType` will have a fallback call.

### Why are the changes needed?
When we use a high version of Hive Client to communicate with a low version 
of Hive meta store, we may encounter Invalid method name: 'get_tables_by_type'.

```java
23/07/17 12:45:24,391 [main] DEBUG SparkSqlParser: Parsing command: show 
views
23/07/17 12:45:24,489 [main] ERROR log: Got exception: 
org.apache.thrift.TApplicationException Invalid method name: 
'get_tables_by_type'
org.apache.thrift.TApplicationException: Invalid method name: 
'get_tables_by_type'
    at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:79)
    at 
org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.recv_get_tables_by_type(ThriftHiveMetastore.java:1433)
    at 
org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.get_tables_by_type(ThriftHiveMetastore.java:1418)
    at 
org.apache.hadoop.hive.metastore.HiveMetaStoreClient.getTables(HiveMetaStoreClient.java:1411)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at 
org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(RetryingMetaStoreClient.java:173)
    at com.sun.proxy.$Proxy23.getTables(Unknown Source)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at 
org.apache.hadoop.hive.metastore.HiveMetaStoreClient$SynchronizedHandler.invoke(HiveMetaStoreClient.java:2344)
    at com.sun.proxy.$Proxy23.getTables(Unknown Source)
    at 
org.apache.hadoop.hive.ql.metadata.Hive.getTablesByType(Hive.java:1427)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at 
org.apache.spark.sql.hive.client.Shim_v2_3.getTablesByType(HiveShim.scala:1408)
    at 
org.apache.spark.sql.hive.client.HiveClientImpl.$anonfun$listTablesByType$1(HiveClientImpl.scala:789)
    at 
org.apache.spark.sql.hive.client.HiveClientImpl.$anonfun$withHiveState$1(HiveClientImpl.scala:294)
    at 
org.apache.spark.sql.hive.client.HiveClientImpl.liftedTree1$1(HiveClientImpl.scala:225)
    at 
org.apache.spark.sql.hive.client.HiveClientImpl.retryLocked(HiveClientImpl.scala:224)
    at 
org.apache.spark.sql.hive.client.HiveClientImpl.withHiveState(HiveClientImpl.scala:274)
    at 
org.apache.spark.sql.hive.client.HiveClientImpl.listTablesByType(HiveClientImpl.scala:785)
    at 
org.apache.spark.sql.hive.HiveExternalCatalog.$anonfun$listViews$1(HiveExternalCatalog.scala:895)
    at 
org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:108)
    at 
org.apache.spark.sql.hive.HiveExternalCatalog.listViews(HiveExternalCatalog.scala:893)
    at 
org.apache.spark.sql.catalyst.catalog.ExternalCatalogWithListener.listViews(ExternalCatalogWithListener.scala:158)
    at 
org.apache.spark.sql.catalyst.catalog.SessionCatalog.listViews(SessionCatalog.scala:1040)
    at 
org.apache.spark.sql.execution.command.ShowViewsCommand.$anonfun$run$5(views.scala:407)
    at scala.Option.getOrElse(Option.scala:189)
    at 
org.apache.spark.sql.execution.command.ShowViewsCommand.run(views.scala:407)
```

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Use the built-in Hive 2.3.9 Client version to communicate with the Hive 
meta store version lower than 2.3, and test.

Closes #42033 from cxzl25/SPARK-44454.

Lead-authored-by: sychen 

[spark] branch master updated: [SPARK-43611][PS][CONNECT][TESTS][FOLLOWUPS] Enable more tests

2023-07-27 Thread ruifengz
This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 9b40c0cdc5c [SPARK-43611][PS][CONNECT][TESTS][FOLLOWUPS] Enable more 
tests
9b40c0cdc5c is described below

commit 9b40c0cdc5cf8e24e66f7ee8e122702d3f157291
Author: Ruifeng Zheng 
AuthorDate: Thu Jul 27 16:30:47 2023 +0800

[SPARK-43611][PS][CONNECT][TESTS][FOLLOWUPS] Enable more tests

### What changes were proposed in this pull request?
Enable more tests, they were excluded from 
https://github.com/apache/spark/pull/42086 due to the flaky CI issues

### Why are the changes needed?
for test parity

### Does this PR introduce _any_ user-facing change?
no, test-only

### How was this patch tested?
enabled tests

Closes #42182 from zhengruifeng/spark_43611_followup.

Authored-by: Ruifeng Zheng 
Signed-off-by: Ruifeng Zheng 
---
 .../connect/computation/test_parity_compute.py | 12 --
 .../connect/computation/test_parity_cumulative.py  | 48 --
 .../diff_frames_ops/test_parity_basic_slow.py  | 18 +---
 .../tests/connect/frame/test_parity_time_series.py |  6 ---
 .../connect/groupby/test_parity_cumulative.py  | 30 +-
 .../tests/connect/groupby/test_parity_groupby.py   | 18 +---
 .../connect/groupby/test_parity_missing_data.py| 18 +---
 .../tests/connect/indexes/test_parity_base.py  |  6 ---
 .../connect/indexes/test_parity_reset_index.py |  6 ---
 .../tests/connect/test_parity_default_index.py |  6 ---
 .../tests/connect/test_parity_generic_functions.py |  6 +--
 ...st_parity_ops_on_diff_frames_groupby_rolling.py | 42 +--
 12 files changed, 6 insertions(+), 210 deletions(-)

diff --git 
a/python/pyspark/pandas/tests/connect/computation/test_parity_compute.py 
b/python/pyspark/pandas/tests/connect/computation/test_parity_compute.py
index 88eeb735d46..e2b92190b6e 100644
--- a/python/pyspark/pandas/tests/connect/computation/test_parity_compute.py
+++ b/python/pyspark/pandas/tests/connect/computation/test_parity_compute.py
@@ -27,22 +27,10 @@ class FrameParityComputeTests(FrameComputeMixin, 
PandasOnSparkTestUtils, ReusedC
 def psdf(self):
 return ps.from_pandas(self.pdf)
 
-@unittest.skip(
-"TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark 
Connect client."
-)
-def test_diff(self):
-super().test_diff()
-
 @unittest.skip("Spark Connect does not support RDD but the tests depend on 
them.")
 def test_mode(self):
 super().test_mode()
 
-@unittest.skip(
-"TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark 
Connect client."
-)
-def test_pct_change(self):
-super().test_pct_change()
-
 @unittest.skip("TODO(SPARK-43618): Fix pyspark.sq.column._unary_op to work 
with Spark Connect.")
 def test_rank(self):
 super().test_rank()
diff --git 
a/python/pyspark/pandas/tests/connect/computation/test_parity_cumulative.py 
b/python/pyspark/pandas/tests/connect/computation/test_parity_cumulative.py
index 8015d90aaa5..e14d296749c 100644
--- a/python/pyspark/pandas/tests/connect/computation/test_parity_cumulative.py
+++ b/python/pyspark/pandas/tests/connect/computation/test_parity_cumulative.py
@@ -29,54 +29,6 @@ class FrameParityCumulativeTests(
 def psdf(self):
 return ps.from_pandas(self.pdf)
 
-@unittest.skip(
-"TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark 
Connect client."
-)
-def test_cummax(self):
-super().test_cummax()
-
-@unittest.skip(
-"TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark 
Connect client."
-)
-def test_cummax_multiindex_columns(self):
-super().test_cummax_multiindex_columns()
-
-@unittest.skip(
-"TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark 
Connect client."
-)
-def test_cummin(self):
-super().test_cummin()
-
-@unittest.skip(
-"TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark 
Connect client."
-)
-def test_cummin_multiindex_columns(self):
-super().test_cummin_multiindex_columns()
-
-@unittest.skip(
-"TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark 
Connect client."
-)
-def test_cumprod(self):
-super().test_cumprod()
-
-@unittest.skip(
-"TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark 
Connect client."
-)
-def test_cumprod_multiindex_columns(self):
-super().test_cumprod_multiindex_columns()
-
-@unittest.skip(
-"TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark 
Connect client."
-)
-def test_cumsum(self):
-super().test_cumsum()
-
-@unittest.skip(
-   

[spark] branch branch-3.4 updated: [SPARK-44544][INFRA][FOLLOWUP] Force run `run_python_packaging_tests`

2023-07-27 Thread ruifengz
This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
 new 7ff70b11248 [SPARK-44544][INFRA][FOLLOWUP] Force run 
`run_python_packaging_tests`
7ff70b11248 is described below

commit 7ff70b11248462352ca23e41ae70cd18dc2db0ba
Author: Ruifeng Zheng 
AuthorDate: Thu Jul 27 16:16:03 2023 +0800

[SPARK-44544][INFRA][FOLLOWUP] Force run `run_python_packaging_tests`

### What changes were proposed in this pull request?
run `run_python_packaging_tests` when there are any changes in PySpark

### Why are the changes needed?
https://github.com/apache/spark/pull/42146 make CI run 
`run_python_packaging_tests` only within `pyspark-errors` (see 
https://github.com/apache/spark/actions/runs/5666118302/job/15359190468 and 
https://github.com/apache/spark/actions/runs/5668071930/job/15358091003)


![image](https://github.com/apache/spark/assets/7322292/aef5cd4c-87ee-4b52-add3-e19ca131cdf1)

but I ignored that `pyspark-errors` maybe skipped (because no related 
source changes), so the `run_python_packaging_tests` maybe also skipped  
unexpectedly (see 
https://github.com/apache/spark/actions/runs/5666523657/job/15353485731)


![image](https://github.com/apache/spark/assets/7322292/c2517d39-efcf-4a95-8562-1507dad35794)

this PR is to run `run_python_packaging_tests` even if `pyspark-errors` is 
skipped

### Does this PR introduce _any_ user-facing change?
no

### How was this patch tested?
updated CI

Closes #42173 from zhengruifeng/infra_followup.

Lead-authored-by: Ruifeng Zheng 
Co-authored-by: Ruifeng Zheng 
Signed-off-by: Ruifeng Zheng 
(cherry picked from commit f7947341ab2984113018f7f7014bb8373a3cb3b1)
Signed-off-by: Ruifeng Zheng 
---
 dev/sparktestsupport/modules.py | 9 -
 dev/sparktestsupport/utils.py   | 2 +-
 2 files changed, 9 insertions(+), 2 deletions(-)

diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py
index fd18ddd6d13..ac24ea19d0e 100644
--- a/dev/sparktestsupport/modules.py
+++ b/dev/sparktestsupport/modules.py
@@ -778,7 +778,14 @@ pyspark_pandas_slow = Module(
 pyspark_errors = Module(
 name="pyspark-errors",
 dependencies=[],
-source_file_regexes=["python/pyspark/errors"],
+source_file_regexes=[
+# SPARK-44544: Force the execution of pyspark_errors when there are 
any changes
+# in PySpark, since the Python Packaging Tests is only enabled within 
this module.
+# This module is the smallest Python test module, it contains only 1 
test file
+# and normally takes < 2 seconds, so the additional cost is small.
+"python/",
+"python/pyspark/errors",
+],
 python_test_goals=[
 # unittests
 "pyspark.errors.tests.test_errors",
diff --git a/dev/sparktestsupport/utils.py b/dev/sparktestsupport/utils.py
index 6b190eb5ab2..19e6d8917e6 100755
--- a/dev/sparktestsupport/utils.py
+++ b/dev/sparktestsupport/utils.py
@@ -38,7 +38,7 @@ def determine_modules_for_files(filenames):
 and `README.md` is always ignored too.
 
 >>> sorted(x.name for x in 
determine_modules_for_files(["python/pyspark/a.py", "sql/core/foo"]))
-['pyspark-core', 'sql']
+['pyspark-core', 'pyspark-errors', 'sql']
 >>> [x.name for x in 
determine_modules_for_files(["file_not_matched_by_any_subproject"])]
 ['root']
 >>> [x.name for x in determine_modules_for_files(["appveyor.yml", 
"sql/README.md"])]


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-44544][INFRA][FOLLOWUP] Force run `run_python_packaging_tests`

2023-07-27 Thread ruifengz
This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new f7947341ab2 [SPARK-44544][INFRA][FOLLOWUP] Force run 
`run_python_packaging_tests`
f7947341ab2 is described below

commit f7947341ab2984113018f7f7014bb8373a3cb3b1
Author: Ruifeng Zheng 
AuthorDate: Thu Jul 27 16:16:03 2023 +0800

[SPARK-44544][INFRA][FOLLOWUP] Force run `run_python_packaging_tests`

### What changes were proposed in this pull request?
run `run_python_packaging_tests` when there are any changes in PySpark

### Why are the changes needed?
https://github.com/apache/spark/pull/42146 make CI run 
`run_python_packaging_tests` only within `pyspark-errors` (see 
https://github.com/apache/spark/actions/runs/5666118302/job/15359190468 and 
https://github.com/apache/spark/actions/runs/5668071930/job/15358091003)


![image](https://github.com/apache/spark/assets/7322292/aef5cd4c-87ee-4b52-add3-e19ca131cdf1)

but I ignored that `pyspark-errors` maybe skipped (because no related 
source changes), so the `run_python_packaging_tests` maybe also skipped  
unexpectedly (see 
https://github.com/apache/spark/actions/runs/5666523657/job/15353485731)


![image](https://github.com/apache/spark/assets/7322292/c2517d39-efcf-4a95-8562-1507dad35794)

this PR is to run `run_python_packaging_tests` even if `pyspark-errors` is 
skipped

### Does this PR introduce _any_ user-facing change?
no

### How was this patch tested?
updated CI

Closes #42173 from zhengruifeng/infra_followup.

Lead-authored-by: Ruifeng Zheng 
Co-authored-by: Ruifeng Zheng 
Signed-off-by: Ruifeng Zheng 
---
 dev/sparktestsupport/modules.py | 9 -
 dev/sparktestsupport/utils.py   | 2 +-
 2 files changed, 9 insertions(+), 2 deletions(-)

diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py
index 9d0ba219e79..3cfd82c3d31 100644
--- a/dev/sparktestsupport/modules.py
+++ b/dev/sparktestsupport/modules.py
@@ -1040,7 +1040,14 @@ pyspark_pandas_slow_connect = Module(
 pyspark_errors = Module(
 name="pyspark-errors",
 dependencies=[],
-source_file_regexes=["python/pyspark/errors"],
+source_file_regexes=[
+# SPARK-44544: Force the execution of pyspark_errors when there are 
any changes
+# in PySpark, since the Python Packaging Tests is only enabled within 
this module.
+# This module is the smallest Python test module, it contains only 1 
test file
+# and normally takes < 2 seconds, so the additional cost is small.
+"python/",
+"python/pyspark/errors",
+],
 python_test_goals=[
 # unittests
 "pyspark.errors.tests.test_errors",
diff --git a/dev/sparktestsupport/utils.py b/dev/sparktestsupport/utils.py
index 339534bec25..816c982bd60 100755
--- a/dev/sparktestsupport/utils.py
+++ b/dev/sparktestsupport/utils.py
@@ -38,7 +38,7 @@ def determine_modules_for_files(filenames):
 and `README.md` is always ignored too.
 
 >>> sorted(x.name for x in 
determine_modules_for_files(["python/pyspark/a.py", "sql/core/foo"]))
-['pyspark-core', 'sql']
+['pyspark-core', 'pyspark-errors', 'sql']
 >>> [x.name for x in 
determine_modules_for_files(["file_not_matched_by_any_subproject"])]
 ['root']
 >>> [x.name for x in determine_modules_for_files(["appveyor.yml", 
"sql/README.md"])]


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.5 updated: [SPARK-44544][INFRA][FOLLOWUP] Force run `run_python_packaging_tests`

2023-07-27 Thread ruifengz
This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
 new 9dd388a1fd7 [SPARK-44544][INFRA][FOLLOWUP] Force run 
`run_python_packaging_tests`
9dd388a1fd7 is described below

commit 9dd388a1fd793d4cdf3cc8586f59220c1d0bcefb
Author: Ruifeng Zheng 
AuthorDate: Thu Jul 27 16:16:03 2023 +0800

[SPARK-44544][INFRA][FOLLOWUP] Force run `run_python_packaging_tests`

### What changes were proposed in this pull request?
run `run_python_packaging_tests` when there are any changes in PySpark

### Why are the changes needed?
https://github.com/apache/spark/pull/42146 make CI run 
`run_python_packaging_tests` only within `pyspark-errors` (see 
https://github.com/apache/spark/actions/runs/5666118302/job/15359190468 and 
https://github.com/apache/spark/actions/runs/5668071930/job/15358091003)


![image](https://github.com/apache/spark/assets/7322292/aef5cd4c-87ee-4b52-add3-e19ca131cdf1)

but I ignored that `pyspark-errors` maybe skipped (because no related 
source changes), so the `run_python_packaging_tests` maybe also skipped  
unexpectedly (see 
https://github.com/apache/spark/actions/runs/5666523657/job/15353485731)


![image](https://github.com/apache/spark/assets/7322292/c2517d39-efcf-4a95-8562-1507dad35794)

this PR is to run `run_python_packaging_tests` even if `pyspark-errors` is 
skipped

### Does this PR introduce _any_ user-facing change?
no

### How was this patch tested?
updated CI

Closes #42173 from zhengruifeng/infra_followup.

Lead-authored-by: Ruifeng Zheng 
Co-authored-by: Ruifeng Zheng 
Signed-off-by: Ruifeng Zheng 
(cherry picked from commit f7947341ab2984113018f7f7014bb8373a3cb3b1)
Signed-off-by: Ruifeng Zheng 
---
 dev/sparktestsupport/modules.py | 9 -
 dev/sparktestsupport/utils.py   | 2 +-
 2 files changed, 9 insertions(+), 2 deletions(-)

diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py
index 9d0ba219e79..3cfd82c3d31 100644
--- a/dev/sparktestsupport/modules.py
+++ b/dev/sparktestsupport/modules.py
@@ -1040,7 +1040,14 @@ pyspark_pandas_slow_connect = Module(
 pyspark_errors = Module(
 name="pyspark-errors",
 dependencies=[],
-source_file_regexes=["python/pyspark/errors"],
+source_file_regexes=[
+# SPARK-44544: Force the execution of pyspark_errors when there are 
any changes
+# in PySpark, since the Python Packaging Tests is only enabled within 
this module.
+# This module is the smallest Python test module, it contains only 1 
test file
+# and normally takes < 2 seconds, so the additional cost is small.
+"python/",
+"python/pyspark/errors",
+],
 python_test_goals=[
 # unittests
 "pyspark.errors.tests.test_errors",
diff --git a/dev/sparktestsupport/utils.py b/dev/sparktestsupport/utils.py
index 339534bec25..816c982bd60 100755
--- a/dev/sparktestsupport/utils.py
+++ b/dev/sparktestsupport/utils.py
@@ -38,7 +38,7 @@ def determine_modules_for_files(filenames):
 and `README.md` is always ignored too.
 
 >>> sorted(x.name for x in 
determine_modules_for_files(["python/pyspark/a.py", "sql/core/foo"]))
-['pyspark-core', 'sql']
+['pyspark-core', 'pyspark-errors', 'sql']
 >>> [x.name for x in 
determine_modules_for_files(["file_not_matched_by_any_subproject"])]
 ['root']
 >>> [x.name for x in determine_modules_for_files(["appveyor.yml", 
"sql/README.md"])]


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-44536][BUILD] Upgrade sbt to 1.9.3

2023-07-27 Thread yangjie01
This is an automated email from the ASF dual-hosted git repository.

yangjie01 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 918d1581fb0 [SPARK-44536][BUILD] Upgrade sbt to 1.9.3
918d1581fb0 is described below

commit 918d1581fb0f79e0cd9a0bedee60ea6060620a31
Author: panbingkun 
AuthorDate: Thu Jul 27 15:55:48 2023 +0800

[SPARK-44536][BUILD] Upgrade sbt to 1.9.3

### What changes were proposed in this pull request?
The pr aims to upgrade sbt from 1.9.2 to 1.9.3.

### Why are the changes needed?
1.The new version brings some improvment:
Actionable diagnostics (aka quickfix)
Actionable diagnostics, or quickfix, is an area in Scala tooling that's 
been getting attention since Chris Kipp presented it in the March 2023 Tooling 
Summit. Chris has written the 
[roadmap](https://contributors.scala-lang.org/t/roadmap-for-actionable-diagnostics/6172/1)
 and sent https://github.com/sbt/sbt/pull/7242 that kickstarted the effort, but 
now there's been steady progress in 
https://github.com/build-server-protocol/build-server-protocol/pull/527, 
https://github.com/lampepfl/d [...]
sbt 1.9.3 adds a new interface called AnalysisCallback2 to relay code 
actions from the compiler(s) to Zinc's Analysis file. Future version of Scala 
2.13.x (and hopefully Scala 3) will release with proper code actions, but as a 
demo I've implemented a code action for procedure syntax usages even on current 
Scala 2.13.11 with -deprecation flag.

2.Full release notes:
https://github.com/sbt/sbt/releases/tag/v1.9.3

3.v1.9.2 VS v1.9.3
https://github.com/sbt/sbt/compare/v1.9.2...v1.9.3

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Pass GA.

Closes #42141 from panbingkun/SPARK-44536.

Authored-by: panbingkun 
Signed-off-by: yangjie01 
---
 dev/appveyor-install-dependencies.ps1 | 2 +-
 project/build.properties  | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)

diff --git a/dev/appveyor-install-dependencies.ps1 
b/dev/appveyor-install-dependencies.ps1
index 3737382eb86..db154cd51da 100644
--- a/dev/appveyor-install-dependencies.ps1
+++ b/dev/appveyor-install-dependencies.ps1
@@ -97,7 +97,7 @@ if (!(Test-Path $tools)) {
 # == SBT
 Push-Location $tools
 
-$sbtVer = "1.9.2"
+$sbtVer = "1.9.3"
 Start-FileDownload 
"https://github.com/sbt/sbt/releases/download/v$sbtVer/sbt-$sbtVer.zip; 
"sbt.zip"
 
 # extract
diff --git a/project/build.properties b/project/build.properties
index 3eb34b94744..e883bb7bdf3 100644
--- a/project/build.properties
+++ b/project/build.properties
@@ -15,4 +15,4 @@
 # limitations under the License.
 #
 # Please update the version in appveyor-install-dependencies.ps1 together.
-sbt.version=1.9.2
+sbt.version=1.9.3


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-44482][CONNECT] Connect server should can specify the bind address

2023-07-27 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new deaa2f7200b [SPARK-44482][CONNECT] Connect server should can specify 
the bind address
deaa2f7200b is described below

commit deaa2f7200bb79b5340c722bc8707dd45d50a1c2
Author: panbingkun 
AuthorDate: Thu Jul 27 16:43:01 2023 +0900

[SPARK-44482][CONNECT] Connect server should can specify the bind address

### What changes were proposed in this pull request?
When a machine has multiple network cards, we may only want users of a 
certain network segment to be able to connect to the connect server. I propose 
that in addition to specifying port, the connect server can also specify 
address.

https://github.com/apache/spark/blob/047fad5861daedbdb58111b223e76c05784f4951/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala#L26-L30

### Why are the changes needed?
1.Connect server should can specify the bind address, improve flexibility.
2.As other Spark components, a bind address can also be specified, such as:

https://github.com/apache/spark/blob/047fad5861daedbdb58111b223e76c05784f4951/common/network-common/src/main/java/org/apache/spark/network/server/TransportServer.java#L147-L149

https://github.com/apache/spark/blob/047fad5861daedbdb58111b223e76c05784f4951/core/src/main/scala/org/apache/spark/SparkEnv.scala#L177-L196

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
- Manually test.
- Pass GA.

Closes #42073 from panbingkun/SPARK-44482.

Authored-by: panbingkun 
Signed-off-by: Hyukjin Kwon 
---
 .../org/apache/spark/sql/connect/config/Connect.scala|  6 ++
 .../spark/sql/connect/service/SparkConnectService.scala  | 16 +++-
 2 files changed, 17 insertions(+), 5 deletions(-)

diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala
index 13e4b9f5364..31f119047e4 100644
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala
@@ -23,6 +23,12 @@ import 
org.apache.spark.sql.connect.common.config.ConnectCommon
 object Connect {
   import org.apache.spark.sql.internal.SQLConf.buildStaticConf
 
+  val CONNECT_GRPC_BINDING_ADDRESS =
+ConfigBuilder("spark.connect.grpc.binding.address")
+  .version("4.0.0")
+  .stringConf
+  .createOptional
+
   val CONNECT_GRPC_BINDING_PORT =
 ConfigBuilder("spark.connect.grpc.binding.port")
   .version("3.4.0")
diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala
index ad40c94d549..6b7007130be 100644
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.connect.service
 
+import java.net.InetSocketAddress
 import java.util.concurrent.TimeUnit
 
 import com.google.common.base.Ticker
@@ -32,7 +33,7 @@ import org.apache.spark.connect.proto
 import org.apache.spark.connect.proto.{AddArtifactsRequest, 
AddArtifactsResponse}
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.connect.config.Connect.{CONNECT_GRPC_BINDING_PORT, 
CONNECT_GRPC_MAX_INBOUND_MESSAGE_SIZE}
+import 
org.apache.spark.sql.connect.config.Connect.{CONNECT_GRPC_BINDING_ADDRESS, 
CONNECT_GRPC_BINDING_PORT, CONNECT_GRPC_MAX_INBOUND_MESSAGE_SIZE}
 import org.apache.spark.sql.connect.utils.ErrorUtils
 
 /**
@@ -167,7 +168,7 @@ class SparkConnectService(debug: Boolean)
  * Used to start the overall SparkConnect service and provides global state to 
manage the
  * different SparkSession from different users connecting to the cluster.
  */
-object SparkConnectService {
+object SparkConnectService extends Logging {
 
   private val CACHE_SIZE = 100
 
@@ -245,10 +246,15 @@ object SparkConnectService {
*/
   private def startGRPCService(): Unit = {
 val debugMode = 
SparkEnv.get.conf.getBoolean("spark.connect.grpc.debug.enabled", true)
+val bindAddress = SparkEnv.get.conf.get(CONNECT_GRPC_BINDING_ADDRESS)
 val port = SparkEnv.get.conf.get(CONNECT_GRPC_BINDING_PORT)
-val sb = NettyServerBuilder
-  .forPort(port)
-  
.maxInboundMessageSize(SparkEnv.get.conf.get(CONNECT_GRPC_MAX_INBOUND_MESSAGE_SIZE).toInt)
+val sb = bindAddress 

[spark] branch branch-3.4 updated: [SPARK-44513][BUILD][3.4] Upgrade snappy-java to 1.1.10.3

2023-07-27 Thread yumwang
This is an automated email from the ASF dual-hosted git repository.

yumwang pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
 new 2456455fbba [SPARK-44513][BUILD][3.4] Upgrade snappy-java to 1.1.10.3
2456455fbba is described below

commit 2456455fbbafb97d5fd168222868dfb93ee284f0
Author: panbingkun 
AuthorDate: Thu Jul 27 15:01:41 2023 +0800

[SPARK-44513][BUILD][3.4] Upgrade snappy-java to 1.1.10.3

### What changes were proposed in this pull request?
- The pr is for branch-3.4.
- The pr aims to upgrade snappy-java from  1.1.10.2 to 1.1.10.3.

### Why are the changes needed?
1.The newest version include a bug fixed:
- Fix the GLIBC_2.32 not found issue of libsnappyjava.so in certain Linux 
distributions on s390x by kun-lu20 in 
https://github.com/xerial/snappy-java/pull/481

2.Release notes:
https://github.com/xerial/snappy-java/releases/tag/v1.1.10.3

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Pass GA.

Closes #42127 from panbingkun/branch-3.4_snappy_1_1_10_3.

Authored-by: panbingkun 
Signed-off-by: Yuming Wang 
---
 dev/deps/spark-deps-hadoop-2-hive-2.3 | 2 +-
 dev/deps/spark-deps-hadoop-3-hive-2.3 | 2 +-
 pom.xml   | 2 +-
 3 files changed, 3 insertions(+), 3 deletions(-)

diff --git a/dev/deps/spark-deps-hadoop-2-hive-2.3 
b/dev/deps/spark-deps-hadoop-2-hive-2.3
index b96ef3c1726..6c22673a7df 100644
--- a/dev/deps/spark-deps-hadoop-2-hive-2.3
+++ b/dev/deps/spark-deps-hadoop-2-hive-2.3
@@ -248,7 +248,7 @@ scala-xml_2.12/2.1.0//scala-xml_2.12-2.1.0.jar
 shims/0.9.38//shims-0.9.38.jar
 slf4j-api/2.0.6//slf4j-api-2.0.6.jar
 snakeyaml/1.33//snakeyaml-1.33.jar
-snappy-java/1.1.10.2//snappy-java-1.1.10.2.jar
+snappy-java/1.1.10.3//snappy-java-1.1.10.3.jar
 spire-macros_2.12/0.17.0//spire-macros_2.12-0.17.0.jar
 spire-platform_2.12/0.17.0//spire-platform_2.12-0.17.0.jar
 spire-util_2.12/0.17.0//spire-util_2.12-0.17.0.jar
diff --git a/dev/deps/spark-deps-hadoop-3-hive-2.3 
b/dev/deps/spark-deps-hadoop-3-hive-2.3
index 895db6bf0df..68a5cbced62 100644
--- a/dev/deps/spark-deps-hadoop-3-hive-2.3
+++ b/dev/deps/spark-deps-hadoop-3-hive-2.3
@@ -235,7 +235,7 @@ scala-xml_2.12/2.1.0//scala-xml_2.12-2.1.0.jar
 shims/0.9.38//shims-0.9.38.jar
 slf4j-api/2.0.6//slf4j-api-2.0.6.jar
 snakeyaml/1.33//snakeyaml-1.33.jar
-snappy-java/1.1.10.2//snappy-java-1.1.10.2.jar
+snappy-java/1.1.10.3//snappy-java-1.1.10.3.jar
 spire-macros_2.12/0.17.0//spire-macros_2.12-0.17.0.jar
 spire-platform_2.12/0.17.0//spire-platform_2.12-0.17.0.jar
 spire-util_2.12/0.17.0//spire-util_2.12-0.17.0.jar
diff --git a/pom.xml b/pom.xml
index ca1dd7ab605..a681adba193 100644
--- a/pom.xml
+++ b/pom.xml
@@ -185,7 +185,7 @@
 1.9.13
 2.14.2
 
2.14.2
-1.1.10.2
+1.1.10.3
 3.0.3
 1.15
 1.22


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org