[spark] branch master updated: [SPARK-43241][PS] `MultiIndex.append` not checking names for equality

2023-09-04 Thread ruifengz
This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new d7e827e52c45 [SPARK-43241][PS] `MultiIndex.append` not checking names 
for equality
d7e827e52c45 is described below

commit d7e827e52c45cc048635a0ba7dafefc4c1c76463
Author: Haejoon Lee 
AuthorDate: Tue Sep 5 11:45:29 2023 +0800

[SPARK-43241][PS] `MultiIndex.append` not checking names for equality

### What changes were proposed in this pull request?

This PR proposes to fix the behavior of `MultiIndex.append` to do not 
checking names.

### Why are the changes needed?

To match the behavior with pandas according to 
https://github.com/pandas-dev/pandas/pull/48288

### Does this PR introduce _any_ user-facing change?

Yes, the behavior is changed to match with pandas:

**Testing data**
```python
>>> psmidx1
MultiIndex([('a', 'x', 1),
('b', 'y', 2),
('c', 'z', 3)],
   names=['x', 'y', 'z'])
>>> psmidx2
MultiIndex([('a', 'x', 1),
('b', 'y', 2),
('c', 'z', 3)],
   names=['p', 'q', 'r'])
```

**Before**
```python
>>> psmidx1.append(psmidx2)
MultiIndex([('a', 'x', 1),
('b', 'y', 2),
('c', 'z', 3),
('a', 'x', 1),
('b', 'y', 2),
('c', 'z', 3)],
   names=['x', 'y', 'z'])
```

**After**
```python
>>> psmidx1.append(psmidx2)
MultiIndex([('a', 'x', 1),
('b', 'y', 2),
('c', 'z', 3),
('a', 'x', 1),
('b', 'y', 2),
('c', 'z', 3)],
   )
```

### How was this patch tested?

Fix the existing UTs.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #42787 from itholic/SPARK-43241.

Authored-by: Haejoon Lee 
Signed-off-by: Ruifeng Zheng 
---
 python/pyspark/pandas/indexes/base.py  |  8 +--
 .../pyspark/pandas/tests/indexes/test_base_slow.py | 26 +++---
 2 files changed, 4 insertions(+), 30 deletions(-)

diff --git a/python/pyspark/pandas/indexes/base.py 
b/python/pyspark/pandas/indexes/base.py
index bfde7e554ba5..c7d9f4e57467 100644
--- a/python/pyspark/pandas/indexes/base.py
+++ b/python/pyspark/pandas/indexes/base.py
@@ -1917,18 +1917,12 @@ class Index(IndexOpsMixin):
 sdf_other = 
other._internal.spark_frame.select(other._internal.index_spark_columns)
 sdf_appended = sdf_self.union(sdf_other)
 
-# names should be kept when MultiIndex, but Index wouldn't keep its 
name.
-if isinstance(self, MultiIndex):
-index_names = self._internal.index_names
-else:
-index_names = None
-
 internal = InternalFrame(
 spark_frame=sdf_appended,
 index_spark_columns=[
 scol_for(sdf_appended, col) for col in 
self._internal.index_spark_column_names
 ],
-index_names=index_names,
+index_names=None,
 index_fields=index_fields,
 )
 
diff --git a/python/pyspark/pandas/tests/indexes/test_base_slow.py 
b/python/pyspark/pandas/tests/indexes/test_base_slow.py
index b2a9e52bc61a..ca38b2ff9aff 100644
--- a/python/pyspark/pandas/tests/indexes/test_base_slow.py
+++ b/python/pyspark/pandas/tests/indexes/test_base_slow.py
@@ -107,29 +107,9 @@ class IndexesSlowTestsMixin:
 psmidx1 = ps.from_pandas(pmidx1)
 psmidx2 = ps.from_pandas(pmidx2)
 
-# TODO(SPARK-43241): MultiIndex.append not checking names for equality.
-# Also refer to https://github.com/pandas-dev/pandas/pull/48288.
-if LooseVersion(pd.__version__) >= LooseVersion("2.0.0"):
-self.assert_eq(
-pmidx1.append(pmidx2), psmidx1.append(psmidx2).rename([None, 
None, None])
-)
-else:
-self.assert_eq(pmidx1.append(pmidx2), psmidx1.append(psmidx2))
-
-if LooseVersion(pd.__version__) >= LooseVersion("2.0.0"):
-self.assert_eq(
-pmidx2.append(pmidx1), psmidx2.append(psmidx1).rename([None, 
None, None])
-)
-else:
-self.assert_eq(pmidx2.append(pmidx1), psmidx2.append(psmidx1))
-
-if LooseVersion(pd.__version__) >= LooseVersion("2.0.0"):
-self.assert_eq(
-pmidx1.append(pmidx2).names,
-psmidx1.append(psmidx2).rename([None, None, None]).names,
-)
-else:
-self.assert_eq(pmidx1.append(pmidx2).names, 
psmidx1.append(psmidx2).names)
+self.assert_eq(pmidx1.append(pmidx2), psmidx1.append(psmidx2))
+self.assert_eq(pmidx2.append(pmidx1), 

[spark] branch master updated: [SPARK-45073][PS][CONNECT] Replace `LastNotNull` with `Last(ignoreNulls=True)`

2023-09-04 Thread ruifengz
This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 4686c2733702 [SPARK-45073][PS][CONNECT] Replace `LastNotNull` with 
`Last(ignoreNulls=True)`
4686c2733702 is described below

commit 4686c27337025dd1a616da73b19abe7ea00a4624
Author: Ruifeng Zheng 
AuthorDate: Tue Sep 5 11:13:35 2023 +0800

[SPARK-45073][PS][CONNECT] Replace `LastNotNull` with 
`Last(ignoreNulls=True)`

### What changes were proposed in this pull request?
Replace `LastNotNull` with `Last(ignoreNulls=True)`

### Why are the changes needed?

https://github.com/apache/spark/pull/36127 introduced a PS dedicated 
expression `LastNotNull`, which was actually not needed and can be replaced 
with built-in `Last`

### Does this PR introduce _any_ user-facing change?
NO

### How was this patch tested?
CI

### Was this patch authored or co-authored using generative AI tooling?
NO

Closes #42808 from zhengruifeng/del_last_not_none.

Authored-by: Ruifeng Zheng 
Signed-off-by: Ruifeng Zheng 
---
 .../sql/connect/planner/SparkConnectPlanner.scala  |  4 ---
 python/pyspark/pandas/series.py|  2 +-
 python/pyspark/pandas/spark/functions.py   | 14 
 .../catalyst/expressions/windowExpressions.scala   | 37 --
 .../spark/sql/api/python/PythonSQLUtils.scala  |  2 --
 5 files changed, 1 insertion(+), 58 deletions(-)

diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
index 579b378d09f6..1a63c9fc27c6 100644
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
@@ -1905,10 +1905,6 @@ class SparkConnectPlanner(val sessionHolder: 
SessionHolder) extends Logging {
 val ignoreNA = extractBoolean(children(2), "ignoreNA")
 Some(EWM(children(0), alpha, ignoreNA))
 
-  case "last_non_null" if fun.getArgumentsCount == 1 =>
-val children = fun.getArgumentsList.asScala.map(transformExpression)
-Some(LastNonNull(children(0)))
-
   case "null_index" if fun.getArgumentsCount == 1 =>
 val children = fun.getArgumentsList.asScala.map(transformExpression)
 Some(NullIndex(children(0)))
diff --git a/python/pyspark/pandas/series.py b/python/pyspark/pandas/series.py
index 7fa08c6d9b24..863e98c42ead 100644
--- a/python/pyspark/pandas/series.py
+++ b/python/pyspark/pandas/series.py
@@ -2257,7 +2257,7 @@ class Series(Frame, IndexOpsMixin, Generic[T]):
 return self._psdf.copy()._psser_for(self._column_label)
 
 scol = self.spark.column
-last_non_null = SF.last_non_null(scol)
+last_non_null = F.last(scol, True)
 null_index = SF.null_index(scol)
 
 Window = get_window_class()
diff --git a/python/pyspark/pandas/spark/functions.py 
b/python/pyspark/pandas/spark/functions.py
index d6f6c6fdeebc..b0bc6efcd56e 100644
--- a/python/pyspark/pandas/spark/functions.py
+++ b/python/pyspark/pandas/spark/functions.py
@@ -159,20 +159,6 @@ def ewm(col: Column, alpha: float, ignore_na: bool) -> 
Column:
 return Column(sc._jvm.PythonSQLUtils.ewm(col._jc, alpha, ignore_na))
 
 
-def last_non_null(col: Column) -> Column:
-if is_remote():
-from pyspark.sql.connect.functions import _invoke_function_over_columns
-
-return _invoke_function_over_columns(  # type: ignore[return-value]
-"last_non_null",
-col,  # type: ignore[arg-type]
-)
-
-else:
-sc = SparkContext._active_spark_context
-return Column(sc._jvm.PythonSQLUtils.lastNonNull(col._jc))
-
-
 def null_index(col: Column) -> Column:
 if is_remote():
 from pyspark.sql.connect.functions import _invoke_function_over_columns
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala
index 50c98c01645d..bc61170f567f 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala
@@ -1152,43 +1152,6 @@ case class EWM(input: Expression, alpha: Double, 
ignoreNA: Boolean)
 }
 
 
-/**
- * Keep the last non-null value seen if any. This expression is dedicated only 
for
- * Pandas API on Spark.
- * For example,
- *  Input: null, 1, 2, 3, null, 4, 5, null
- *  Output: null, 1, 2, 3, 3, 4, 5, 5
- */
-case class 

[spark] branch master updated: [SPARK-45049][CONNECT][DOCS][TESTS] Refine docstrings of `coalesce/repartition/repartitionByRange`

2023-09-04 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 66fb2250224f [SPARK-45049][CONNECT][DOCS][TESTS] Refine docstrings of 
`coalesce/repartition/repartitionByRange`
66fb2250224f is described below

commit 66fb2250224ff9ffb71bf2b320ec05d1b33145c2
Author: Ruifeng Zheng 
AuthorDate: Tue Sep 5 11:09:31 2023 +0900

[SPARK-45049][CONNECT][DOCS][TESTS] Refine docstrings of 
`coalesce/repartition/repartitionByRange`

### What changes were proposed in this pull request?
Enable doctests for `coalesce/repartition/repartitionByRange`, by using 
`explain` instead of `rdd.getNumPartitions`

### Why are the changes needed?
test coverage

### Does this PR introduce _any_ user-facing change?
yes

### How was this patch tested?
updated doctests

### Was this patch authored or co-authored using generative AI tooling?
NO

Closes #42770 from zhengruifeng/enable_doctest_partition.

Authored-by: Ruifeng Zheng 
Signed-off-by: Hyukjin Kwon 
---
 python/pyspark/sql/connect/dataframe.py |   4 --
 python/pyspark/sql/dataframe.py | 117 +++-
 2 files changed, 99 insertions(+), 22 deletions(-)

diff --git a/python/pyspark/sql/connect/dataframe.py 
b/python/pyspark/sql/connect/dataframe.py
index 86a635361858..b22fdc1383cf 100644
--- a/python/pyspark/sql/connect/dataframe.py
+++ b/python/pyspark/sql/connect/dataframe.py
@@ -2191,10 +2191,6 @@ def _test() -> None:
 os.chdir(os.environ["SPARK_HOME"])
 
 globs = pyspark.sql.connect.dataframe.__dict__.copy()
-# Spark Connect does not support RDD but the tests depend on them.
-del pyspark.sql.connect.dataframe.DataFrame.coalesce.__doc__
-del pyspark.sql.connect.dataframe.DataFrame.repartition.__doc__
-del pyspark.sql.connect.dataframe.DataFrame.repartitionByRange.__doc__
 
 # TODO(SPARK-41625): Support Structured Streaming
 del pyspark.sql.connect.dataframe.DataFrame.isStreaming.__doc__
diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py
index 4b8bdd1c2779..3d7bdd7a0b2b 100644
--- a/python/pyspark/sql/dataframe.py
+++ b/python/pyspark/sql/dataframe.py
@@ -1763,9 +1763,27 @@ class DataFrame(PandasMapOpsMixin, 
PandasConversionMixin):
 
 Examples
 
->>> df = spark.range(10)
->>> df.coalesce(1).rdd.getNumPartitions()
-1
+>>> from pyspark.sql import functions as sf
+>>> spark.range(0, 10, 1, 3).select(
+... sf.spark_partition_id().alias("partition")
+... ).distinct().sort("partition").show()
++-+
+|partition|
++-+
+|0|
+|1|
+|2|
++-+
+
+>>> from pyspark.sql import functions as sf
+>>> spark.range(0, 10, 1, 3).coalesce(1).select(
+... sf.spark_partition_id().alias("partition")
+... ).distinct().sort("partition").show()
++-+
+|partition|
++-+
+|0|
++-+
 """
 return DataFrame(self._jdf.coalesce(numPartitions), self.sparkSession)
 
@@ -1809,23 +1827,78 @@ class DataFrame(PandasMapOpsMixin, 
PandasConversionMixin):
 
 Examples
 
->>> df = spark.createDataFrame(
-... [(14, "Tom"), (23, "Alice"), (16, "Bob")], ["age", "name"])
+>>> from pyspark.sql import functions as sf
+>>> df = spark.range(0, 64, 1, 9).withColumn(
+... "name", sf.concat(sf.lit("name_"), sf.col("id").cast("string"))
+... ).withColumn(
+... "age", sf.col("id") - 32
+... )
+>>> df.select(
+... sf.spark_partition_id().alias("partition")
+... ).distinct().sort("partition").show()
++-+
+|partition|
++-+
+|0|
+|1|
+|2|
+|3|
+|4|
+|5|
+|6|
+|7|
+|8|
++-+
 
 Repartition the data into 10 partitions.
 
->>> df.repartition(10).rdd.getNumPartitions()
-10
+>>> df.repartition(10).select(
+... sf.spark_partition_id().alias("partition")
+... ).distinct().sort("partition").show()
++-+
+|partition|
++-+
+|0|
+|1|
+|2|
+|3|
+|4|
+|5|
+|6|
+|7|
+|8|
+|9|
++-+
 
 Repartition the data into 7 partitions by 'age' column.
 
->>> df.repartition(7, "age").rdd.getNumPartitions()
-

[spark] branch master updated: [SPARK-42619][PS] Add `show_counts` parameter for DataFrame.info

2023-09-04 Thread ruifengz
This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 8d358cdbd57 [SPARK-42619][PS] Add `show_counts` parameter for 
DataFrame.info
8d358cdbd57 is described below

commit 8d358cdbd57e69a16c914f329d3e4173ceb7b1ef
Author: zhyhimont 
AuthorDate: Tue Sep 5 08:45:57 2023 +0800

[SPARK-42619][PS] Add `show_counts` parameter for DataFrame.info

### What changes were proposed in this pull request?

Added `show_counts` parameter for DataFrame.info

### Why are the changes needed?

When pandas 2.0.0 is released, we should match the behavior in pandas API 
on Spark.

### Does this PR introduce _any_ user-facing change?

Changed the name of the parameter `null_counts` to `show_counts` of the 
method DataFrame.info

### How was this patch tested?

UT

Closes #40436 from dzhigimont/SPARK-42619_ZH.

Lead-authored-by: zhyhimont 
Co-authored-by: Zhyhimont Dmitry 
Signed-off-by: Ruifeng Zheng 
---
 .../source/migration_guide/pyspark_upgrade.rst |  1 +
 python/pyspark/pandas/frame.py |  7 +++---
 python/pyspark/pandas/indexes/base.py  |  2 +-
 python/pyspark/pandas/tests/io/test_io.py  | 28 ++
 4 files changed, 34 insertions(+), 4 deletions(-)

diff --git a/python/docs/source/migration_guide/pyspark_upgrade.rst 
b/python/docs/source/migration_guide/pyspark_upgrade.rst
index 9ec38ad2709..8b3058ba547 100644
--- a/python/docs/source/migration_guide/pyspark_upgrade.rst
+++ b/python/docs/source/migration_guide/pyspark_upgrade.rst
@@ -39,6 +39,7 @@ Upgrading from PySpark 3.5 to 4.0
 * In Spark 4.0, the default value of ``regex`` parameter for 
``Series.str.replace`` has been changed from ``True`` to ``False`` from pandas 
API on Spark. Additionally, a single character ``pat`` with ``regex=True`` is 
now treated as a regular expression instead of a string literal.
 * In Spark 4.0, the resulting name from ``value_counts`` for all objects sets 
to ``'count'`` (or ``'proportion'`` if ``normalize=True`` was passed) from 
pandas API on Spark, and the index will be named after the original object.
 * In Spark 4.0, ``squeeze`` parameter from ``ps.read_csv`` and 
``ps.read_excel`` has been removed from pandas API on Spark.
+* In Spark 4.0, ``null_counts`` parameter from ``DataFrame.info`` has been 
removed from pandas API on Spark, use ``show_counts`` instead.
 
 
 Upgrading from PySpark 3.3 to 3.4
diff --git a/python/pyspark/pandas/frame.py b/python/pyspark/pandas/frame.py
index 41ab03a5c0b..adbef607256 100644
--- a/python/pyspark/pandas/frame.py
+++ b/python/pyspark/pandas/frame.py
@@ -11948,12 +11948,12 @@ defaultdict(, {'col..., 'col...})]
 
 return cast(ps.Series, 
ps.from_pandas(psdf._to_internal_pandas().idxmin()))
 
-# TODO(SPARK-41619): Add `show_counts` parameter and replace with 
`null_counts`.
 def info(
 self,
 verbose: Optional[bool] = None,
 buf: Optional[IO[str]] = None,
 max_cols: Optional[int] = None,
+show_counts: Optional[bool] = None,
 ) -> None:
 """
 Print a concise summary of a DataFrame.
@@ -11973,10 +11973,10 @@ defaultdict(, {'col..., 'col...})]
 When to switch from the verbose to the truncated output. If the
 DataFrame has more than `max_cols` columns, the truncated output
 is used.
-null_counts : bool, optional
+show_counts : bool, optional
 Whether to show the non-null counts.
 
-.. deprecated:: 3.4.0
+.. versionadded:: 4.0.0
 
 Returns
 ---
@@ -12066,6 +12066,7 @@ defaultdict(, {'col..., 'col...})]
 buf=buf,
 max_cols=max_cols,
 memory_usage=False,
+show_counts=show_counts,  # type: ignore
 )
 finally:
 del self._data
diff --git a/python/pyspark/pandas/indexes/base.py 
b/python/pyspark/pandas/indexes/base.py
index 4c2ab137435..bfde7e554ba 100644
--- a/python/pyspark/pandas/indexes/base.py
+++ b/python/pyspark/pandas/indexes/base.py
@@ -289,7 +289,7 @@ class Index(IndexOpsMixin):
 
 if name is None:
 name = type(self).__name__
-return "%s: %s entries%s" % (name, total_count, index_summary)
+return "%s: %s entries%s" % (name, int(total_count), index_summary)
 
 @property
 def size(self) -> int:
diff --git a/python/pyspark/pandas/tests/io/test_io.py 
b/python/pyspark/pandas/tests/io/test_io.py
index 4eadc6a7eb5..59812ae3d5a 100644
--- a/python/pyspark/pandas/tests/io/test_io.py
+++ b/python/pyspark/pandas/tests/io/test_io.py
@@ -16,6 +16,7 @@
 #
 from distutils.version import LooseVersion
 import unittest
+from io 

[spark] branch master updated (d17a8613a68 -> ebdbc7b1d3e)

2023-09-04 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from d17a8613a68 [SPARK-45047][PYTHON][CONNECT] `DataFrame.groupBy` support 
ordinals
 add ebdbc7b1d3e [SPARK-45064][PYTHON][CONNECT] Add the missing `scale` 
parameter in `ceil/ceiling`

No new revisions were added by this update.

Summary of changes:
 python/pyspark/sql/connect/functions.py | 16 ++---
 python/pyspark/sql/functions.py | 58 +
 2 files changed, 57 insertions(+), 17 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-45047][PYTHON][CONNECT] `DataFrame.groupBy` support ordinals

2023-09-04 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new d17a8613a68 [SPARK-45047][PYTHON][CONNECT] `DataFrame.groupBy` support 
ordinals
d17a8613a68 is described below

commit d17a8613a68af076bc796881831382c29df4d90e
Author: Ruifeng Zheng 
AuthorDate: Mon Sep 4 15:23:08 2023 -0700

[SPARK-45047][PYTHON][CONNECT] `DataFrame.groupBy` support ordinals

### What changes were proposed in this pull request?

make `DataFrame.groupBy` accept ordinals

### Why are the changes needed?

for feature parity

```
select target_country, ua_date, sum(spending_usd)
from df
group by 2, 1
order by 2, 3 desc
```

this PR focus on the `groupBy` method

### Does this PR introduce _any_ user-facing change?
yes, new feature

```
In [2]: from pyspark.sql import functions as sf

In [3]: df = spark.createDataFrame([(1, 1), (1, 2), (2, 1), (2, 2), (3, 1), 
(3, 2)], ["a", "b"])

In [4]: df.select("a", sf.lit(1), "b").groupBy("a", 
2).agg(sf.sum("b")).show()
+---+---+--+
|  a|  1|sum(b)|
+---+---+--+
|  1|  1| 3|
|  2|  1| 3|
|  3|  1| 3|
+---+---+--+
```

### How was this patch tested?
added ut

### Was this patch authored or co-authored using generative AI tooling?
no

Closes #42767 from zhengruifeng/py_groupby_index.

Authored-by: Ruifeng Zheng 
Signed-off-by: Dongjoon Hyun 
---
 python/pyspark/sql/_typing.pyi |  1 +
 python/pyspark/sql/connect/_typing.py  |  2 +
 python/pyspark/sql/connect/dataframe.py|  9 ++-
 python/pyspark/sql/dataframe.py| 66 --
 python/pyspark/sql/tests/test_group.py | 61 
 python/pyspark/sql/tests/typing/test_dataframe.yml |  2 +-
 6 files changed, 133 insertions(+), 8 deletions(-)

diff --git a/python/pyspark/sql/_typing.pyi b/python/pyspark/sql/_typing.pyi
index 3d095f55709..cee44c4aa06 100644
--- a/python/pyspark/sql/_typing.pyi
+++ b/python/pyspark/sql/_typing.pyi
@@ -36,6 +36,7 @@ from pyspark.sql.column import Column
 
 ColumnOrName = Union[Column, str]
 ColumnOrName_ = TypeVar("ColumnOrName_", bound=ColumnOrName)
+ColumnOrNameOrOrdinal = Union[Column, str, int]
 DecimalLiteral = decimal.Decimal
 DateTimeLiteral = Union[datetime.datetime, datetime.date]
 LiteralType = PrimitiveType
diff --git a/python/pyspark/sql/connect/_typing.py 
b/python/pyspark/sql/connect/_typing.py
index 4c76e37659c..471af24f40d 100644
--- a/python/pyspark/sql/connect/_typing.py
+++ b/python/pyspark/sql/connect/_typing.py
@@ -37,6 +37,8 @@ from pyspark.sql.streaming.state import GroupState
 
 ColumnOrName = Union[Column, str]
 
+ColumnOrNameOrOrdinal = Union[Column, str, int]
+
 PrimitiveType = Union[bool, float, int, str]
 
 OptionalPrimitiveType = Optional[PrimitiveType]
diff --git a/python/pyspark/sql/connect/dataframe.py 
b/python/pyspark/sql/connect/dataframe.py
index c42de589f8d..86a63536185 100644
--- a/python/pyspark/sql/connect/dataframe.py
+++ b/python/pyspark/sql/connect/dataframe.py
@@ -85,6 +85,7 @@ from pyspark.sql.pandas.types import from_arrow_schema
 if TYPE_CHECKING:
 from pyspark.sql.connect._typing import (
 ColumnOrName,
+ColumnOrNameOrOrdinal,
 LiteralType,
 PrimitiveType,
 OptionalPrimitiveType,
@@ -476,7 +477,7 @@ class DataFrame:
 
 first.__doc__ = PySparkDataFrame.first.__doc__
 
-def groupBy(self, *cols: "ColumnOrName") -> GroupedData:
+def groupBy(self, *cols: "ColumnOrNameOrOrdinal") -> GroupedData:
 if len(cols) == 1 and isinstance(cols[0], list):
 cols = cols[0]
 
@@ -486,6 +487,12 @@ class DataFrame:
 _cols.append(c)
 elif isinstance(c, str):
 _cols.append(self[c])
+elif isinstance(c, int) and not isinstance(c, bool):
+# TODO: should introduce dedicated error class
+if c < 1:
+raise IndexError(f"Column ordinal must be positive but got 
{c}")
+# ordinal is 1-based
+_cols.append(self[c - 1])
 else:
 raise PySparkTypeError(
 error_class="NOT_COLUMN_OR_STR",
diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py
index 64592311a13..4b8bdd1c277 100644
--- a/python/pyspark/sql/dataframe.py
+++ b/python/pyspark/sql/dataframe.py
@@ -67,7 +67,12 @@ from pyspark.sql.pandas.map_ops import PandasMapOpsMixin
 if TYPE_CHECKING:
 from pyspark._typing import PrimitiveType
 from pyspark.pandas.frame import DataFrame as PandasOnSparkDataFrame
-from pyspark.sql._typing import ColumnOrName, LiteralType, 

[spark] branch master updated: [SPARK-45059][CONNECT][FOLLOWUP] Remove `try_reflect` problem filter rule in connect

2023-09-04 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 96e1e86b0a8 [SPARK-45059][CONNECT][FOLLOWUP] Remove `try_reflect` 
problem filter rule in connect
96e1e86b0a8 is described below

commit 96e1e86b0a8f296845d697b0019d7ed16864c938
Author: Jia Fan 
AuthorDate: Mon Sep 4 15:14:55 2023 -0700

[SPARK-45059][CONNECT][FOLLOWUP] Remove `try_reflect` problem filter rule 
in connect

### What changes were proposed in this pull request?
This is a followup PR for #42783 , to remove  `try_reflect` problem filter 
rule in spark connect.

### Why are the changes needed?
make sure the `try_reflect` check work.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
exist test.

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #42800 from Hisoka-X/SPARK-45059_remove_connect_check.

Authored-by: Jia Fan 
Signed-off-by: Dongjoon Hyun 
---
 .../spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala   | 2 --
 1 file changed, 2 deletions(-)

diff --git 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
index bf512ed71fd..1e536cd37fe 100644
--- 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
+++ 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
@@ -208,8 +208,6 @@ object CheckConnectJvmClientCompatibility {
   // functions
   
ProblemFilters.exclude[Problem]("org.apache.spark.sql.functions.unwrap_udt"),
   ProblemFilters.exclude[Problem]("org.apache.spark.sql.functions.udaf"),
-  ProblemFilters.exclude[DirectMissingMethodProblem](
-"org.apache.spark.sql.functions.try_reflect"),
 
   // KeyValueGroupedDataset
   ProblemFilters.exclude[Problem](


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-45066][SQL][PYTHON][CONNECT] Make function `repeat` accept column-type `n`

2023-09-04 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new ba20eaa4c30 [SPARK-45066][SQL][PYTHON][CONNECT] Make function `repeat` 
accept column-type `n`
ba20eaa4c30 is described below

commit ba20eaa4c30aecb32ba2deb7bbf502bec929a297
Author: Ruifeng Zheng 
AuthorDate: Mon Sep 4 15:04:25 2023 -0700

[SPARK-45066][SQL][PYTHON][CONNECT] Make function `repeat` accept 
column-type `n`

### What changes were proposed in this pull request?
Make function `repeat` accept column-type `n`

### Why are the changes needed?

1. to follow this guide: 
https://github.com/apache/spark/blob/5b609598503df603cbddd5e1adf8d2cb28a5f977/sql/core/src/main/scala/org/apache/spark/sql/functions.scala#L60-L62
2. especially, can replace [the internal 
function](https://github.com/apache/spark/blob/17fac569b4e4b569d41f761db07d7bf112801e0c/python/pyspark/pandas/spark/functions.py#L138-L143)
 in Pandas API (to make the PR clean, I will replace it in separate PR)

### Does this PR introduce _any_ user-facing change?
yes

### How was this patch tested?
NO

### Was this patch authored or co-authored using generative AI tooling?
NO

Closes #42794 from zhengruifeng/func_repeat_func.

Authored-by: Ruifeng Zheng 
Signed-off-by: Dongjoon Hyun 
---
 .../scala/org/apache/spark/sql/functions.scala |  8 +
 python/pyspark/sql/connect/functions.py|  5 +--
 python/pyspark/sql/functions.py| 42 ++
 .../scala/org/apache/spark/sql/functions.scala | 10 ++
 4 files changed, 57 insertions(+), 8 deletions(-)

diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala
index 9ead800ace7..527848e95e6 100644
--- 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala
+++ 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala
@@ -4100,6 +4100,14 @@ object functions {
*/
   def repeat(str: Column, n: Int): Column = Column.fn("repeat", str, lit(n))
 
+  /**
+   * Repeats a string column n times, and returns it as a new string column.
+   *
+   * @group string_funcs
+   * @since 4.0.0
+   */
+  def repeat(str: Column, n: Column): Column = Column.fn("repeat", str, n)
+
   /**
* Trim the spaces from right end for the specified string value.
*
diff --git a/python/pyspark/sql/connect/functions.py 
b/python/pyspark/sql/connect/functions.py
index f290549ae47..19dd021ba08 100644
--- a/python/pyspark/sql/connect/functions.py
+++ b/python/pyspark/sql/connect/functions.py
@@ -2357,8 +2357,9 @@ def rpad(col: "ColumnOrName", len: int, pad: str) -> 
Column:
 rpad.__doc__ = pysparkfuncs.rpad.__doc__
 
 
-def repeat(col: "ColumnOrName", n: int) -> Column:
-return _invoke_function("repeat", _to_col(col), lit(n))
+def repeat(col: "ColumnOrName", n: Union["ColumnOrName", int]) -> Column:
+n = lit(n) if isinstance(n, int) else n
+return _invoke_function("repeat", _to_col(col), _to_col(n))
 
 
 repeat.__doc__ = pysparkfuncs.repeat.__doc__
diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py
index 6e0caf50c16..699c8b9c8cf 100644
--- a/python/pyspark/sql/functions.py
+++ b/python/pyspark/sql/functions.py
@@ -10020,7 +10020,7 @@ def rpad(col: "ColumnOrName", len: int, pad: str) -> 
Column:
 
 
 @try_remote_functions
-def repeat(col: "ColumnOrName", n: int) -> Column:
+def repeat(col: "ColumnOrName", n: Union["ColumnOrName", int]) -> Column:
 """
 Repeats a string column n times, and returns it as a new string column.
 
@@ -10033,9 +10033,12 @@ def repeat(col: "ColumnOrName", n: int) -> Column:
 --
 col : :class:`~pyspark.sql.Column` or str
 target column to work on.
-n : int
+n : :class:`~pyspark.sql.Column` or str or int
 number of times to repeat value.
 
+.. versionchanged:: 4.0.0
+   `n` now accepts column and column name.
+
 Returns
 ---
 :class:`~pyspark.sql.Column`
@@ -10043,11 +10046,38 @@ def repeat(col: "ColumnOrName", n: int) -> Column:
 
 Examples
 
->>> df = spark.createDataFrame([('ab',)], ['s',])
->>> df.select(repeat(df.s, 3).alias('s')).collect()
-[Row(s='ababab')]
+>>> import pyspark.sql.functions as sf
+>>> spark.createDataFrame(
+... [('ab',)], ['s',]
+... ).select(sf.repeat("s", 3)).show()
+++
+|repeat(s, 3)|
+++
+|  ababab|
+++
+
+>>> import pyspark.sql.functions as sf
+>>> spark.createDataFrame(
+... [('ab',)], ['s',]
+... ).select(sf.repeat("s", sf.lit(4))).show()
+++
+

[spark] branch master updated: [SPARK-45063][PYTHON][DOCS] Refine docstring of `max_by/min_by`

2023-09-04 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 798fce3b571 [SPARK-45063][PYTHON][DOCS] Refine docstring of 
`max_by/min_by`
798fce3b571 is described below

commit 798fce3b571907ee52058004cc38c2e8dbc4b016
Author: yangjie01 
AuthorDate: Mon Sep 4 14:48:14 2023 -0700

[SPARK-45063][PYTHON][DOCS] Refine docstring of `max_by/min_by`

### What changes were proposed in this pull request?
This pr refine docstring of `max_by/min_by` and add some new examples.

### Why are the changes needed?
To improve PySpark documentation

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Pass Github Actions

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #42789 from LuciferYang/SPARK-45063.

Authored-by: yangjie01 
Signed-off-by: Dongjoon Hyun 
---
 python/pyspark/sql/functions.py | 96 -
 1 file changed, 86 insertions(+), 10 deletions(-)

diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py
index d025b13cd10..6e0caf50c16 100644
--- a/python/pyspark/sql/functions.py
+++ b/python/pyspark/sql/functions.py
@@ -1346,7 +1346,9 @@ def min(col: "ColumnOrName") -> Column:
 @try_remote_functions
 def max_by(col: "ColumnOrName", ord: "ColumnOrName") -> Column:
 """
-Returns the value associated with the maximum value of ord.
+Returns the value from the `col` parameter that is associated with the 
maximum value
+from the `ord` parameter. This function is often used to find the `col` 
parameter value
+corresponding to the maximum `ord` parameter value within each group when 
used with groupBy().
 
 .. versionadded:: 3.3.0
 
@@ -1356,28 +1358,64 @@ def max_by(col: "ColumnOrName", ord: "ColumnOrName") -> 
Column:
 Parameters
 --
 col : :class:`~pyspark.sql.Column` or str
-target column to compute on.
+The column representing the values to be returned. This could be the 
column instance
+or the column name as string.
 ord : :class:`~pyspark.sql.Column` or str
-column to be maximized
+The column that needs to be maximized. This could be the column 
instance
+or the column name as string.
 
 Returns
 ---
 :class:`~pyspark.sql.Column`
-value associated with the maximum value of ord.
+A column object representing the value from `col` that is associated 
with
+the maximum value from `ord`.
 
 Examples
 
+Example 1: Using `max_by` with groupBy
+
+>>> import pyspark.sql.functions as sf
 >>> df = spark.createDataFrame([
 ... ("Java", 2012, 2), ("dotNET", 2012, 5000),
 ... ("dotNET", 2013, 48000), ("Java", 2013, 3)],
 ... schema=("course", "year", "earnings"))
->>> df.groupby("course").agg(max_by("year", "earnings")).show()
+>>> df.groupby("course").agg(sf.max_by("year", "earnings")).show()
 +--+--+
 |course|max_by(year, earnings)|
 +--+--+
 |  Java|  2013|
 |dotNET|  2013|
 +--+--+
+
+Example 2: Using `max_by` with different data types
+
+>>> import pyspark.sql.functions as sf
+>>> df = spark.createDataFrame([
+... ("Marketing", "Anna", 4), ("IT", "Bob", 2),
+... ("IT", "Charlie", 3), ("Marketing", "David", 1)],
+... schema=("department", "name", "years_in_dept"))
+>>> df.groupby("department").agg(sf.max_by("name", "years_in_dept")).show()
++--+---+
+|department|max_by(name, years_in_dept)|
++--+---+
+|IT|Charlie|
+| Marketing|   Anna|
++--+---+
+
+Example 3: Using `max_by` where `ord` has multiple maximum values
+
+>>> import pyspark.sql.functions as sf
+>>> df = spark.createDataFrame([
+... ("Consult", "Eva", 6), ("Finance", "Frank", 5),
+... ("Finance", "George", 5), ("Consult", "Henry", 7)],
+... schema=("department", "name", "years_in_dept"))
+>>> df.groupby("department").agg(sf.max_by("name", "years_in_dept")).show()
++--+---+
+|department|max_by(name, years_in_dept)|
++--+---+
+|   Consult|  Henry|
+|   Finance| George|
++--+---+
 """
 return _invoke_function_over_columns("max_by", col, ord)
 
@@ -1385,7 +1423,9 @@ def max_by(col: "ColumnOrName", ord: "ColumnOrName") -> 
Column:
 

[spark] branch master updated: [SPARK-45031][INFRA] Choose the right merge code path and merge hash for reopened PRs

2023-09-04 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 792b37c1ab4 [SPARK-45031][INFRA] Choose the right merge code path and 
merge hash for reopened PRs
792b37c1ab4 is described below

commit 792b37c1ab4658b81f2e5f06d28e438af53988fb
Author: Kent Yao 
AuthorDate: Mon Sep 4 14:45:45 2023 -0700

[SPARK-45031][INFRA] Choose the right merge code path and merge hash for 
reopened PRs

### What changes were proposed in this pull request?

When determining to cherry-pick a PR, we also check the PR is in the closed 
state; Otherwise, we assume it gets reverted and reopened, and we go normal 
merging.

When cherry-picking, we shall select the merge hash from the latest 
commit-close event instead of the oldest. This is a bug fix

### Why are the changes needed?

Bugfix

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

mutually

### Was this patch authored or co-authored using generative AI tooling?

no

Closes #42749 from yaooqinn/SPARK-45031.

Authored-by: Kent Yao 
Signed-off-by: Dongjoon Hyun 
---
 dev/merge_spark_pr.py | 9 +++--
 1 file changed, 7 insertions(+), 2 deletions(-)

diff --git a/dev/merge_spark_pr.py b/dev/merge_spark_pr.py
index 23f5af7daca..4021999f19b 100755
--- a/dev/merge_spark_pr.py
+++ b/dev/merge_spark_pr.py
@@ -615,8 +615,13 @@ def main():
 # Instead, they're closed by committers.
 merge_commits = [e for e in pr_events if e["event"] == "closed" and 
e["commit_id"] is not None]
 
-if merge_commits:
-merge_hash = merge_commits[0]["commit_id"]
+if merge_commits and pr["state"] == "closed":
+# A PR might have multiple merge commits, if it's reopened and merged 
again. We shall
+# cherry-pick PRs in closed state with the latest merge hash.
+# If the PR is still open(reopened), we shall not cherry-pick it but 
perform the normal
+# merge as it could have been reverted earlier.
+merge_commits = sorted(merge_commits, key=lambda x: x["created_at"])
+merge_hash = merge_commits[-1]["commit_id"]
 message = get_json("%s/commits/%s" % (GITHUB_API_BASE, 
merge_hash))["commit"]["message"]
 
 print("Pull request %s has already been merged, assuming you want to 
backport" % pr_num)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.4 updated: [SPARK-44940][SQL][3.4] Improve performance of JSON parsing when "spark.sql.json.enablePartialResults" is enabled

2023-09-04 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
 new daf481d9505 [SPARK-44940][SQL][3.4] Improve performance of JSON 
parsing when "spark.sql.json.enablePartialResults" is enabled
daf481d9505 is described below

commit daf481d950564efc01fb99628dded08ad1f51ff2
Author: Ivan Sadikov 
AuthorDate: Mon Sep 4 14:39:06 2023 -0700

[SPARK-44940][SQL][3.4] Improve performance of JSON parsing when 
"spark.sql.json.enablePartialResults" is enabled

### What changes were proposed in this pull request?

Backport of https://github.com/apache/spark/pull/42667 to branch-3.4.

The PR improves JSON parsing when `spark.sql.json.enablePartialResults` is 
enabled:
- Fixes the issue when using nested arrays `ClassCastException: 
org.apache.spark.sql.catalyst.util.GenericArrayData cannot be cast to 
org.apache.spark.sql.catalyst.InternalRow`
- Improves parsing of the nested struct fields, e.g. `{"a1": "AAA", "a2": 
[{"f1": "", "f2": ""}], "a3": "id1", "a4": "XXX"}` used to be parsed as 
`|AAA|NULL|NULL|NULL|` and now is parsed as `|AAA|[{NULL, }]|id1|XXX|`.
- Improves performance of nested JSON parsing. The initial implementation 
would throw too many exceptions when multiple nested fields failed to parse. 
When the config is disabled, it is not a problem because the entire record is 
marked as NULL.

The internal benchmarks show the performance improvement from slowdown of 
over 160% to an improvement of 7-8% compared to the master branch when the flag 
is enabled. I will create a follow-up ticket to add a benchmark for this 
regression.

### Why are the changes needed?

Fixes some corner cases in JSON parsing and improves performance when 
`spark.sql.json.enablePartialResults` is enabled.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

I added tests to verify nested structs, maps, and arrays can be parsed 
without affecting the subsequent fields in the JSON. I also updated the 
existing tests when `spark.sql.json.enablePartialResults` is enabled because we 
parse more data now.

I added a benchmark to check performance.

Before the change (master, 
https://github.com/apache/spark/commit/a45a3a3d60cb97b107a177ad16bfe36372bc3e9b):
```
[info] OpenJDK 64-Bit Server VM 1.8.0_292-8u292-b10-0ubuntu1~18.04-b10 on 
Linux 5.4.0-1045-aws
[info] Intel(R) Xeon(R) Platinum 8375C CPU  2.90GHz
[info] Partial JSON results: Best Time(ms)   Avg 
Time(ms)   Stdev(ms)Rate(M/s)   Per Row(ns)   Relative
[info] 

[info] parse invalid JSON 9537   
9820 452  0.0  953651.6   1.0X
```

After the change (this PR):
```
OpenJDK 64-Bit Server VM 1.8.0_292-8u292-b10-0ubuntu1~18.04-b10 on Linux 
5.4.0-1045-aws
Intel(R) Xeon(R) Platinum 8375C CPU  2.90GHz
Partial JSON results: Best Time(ms)   Avg Time(ms)   
Stdev(ms)Rate(M/s)   Per Row(ns)   Relative


parse invalid JSON 3100   3106  
 6  0.0  309967.6   1.0X
```

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #42792 from sadikovi/SPARK-44940-3.4.

Authored-by: Ivan Sadikov 
Signed-off-by: Dongjoon Hyun 
---
 .../spark/sql/catalyst/json/JacksonParser.scala|  41 -
 .../sql/catalyst/util/BadRecordException.scala |  69 -
 .../spark/sql/errors/QueryExecutionErrors.scala|  12 +-
 sql/core/benchmarks/JsonBenchmark-results.txt  | 153 ++-
 .../org/apache/spark/sql/JsonFunctionsSuite.scala  |  20 ++-
 .../execution/datasources/json/JsonBenchmark.scala |  28 
 .../sql/execution/datasources/json/JsonSuite.scala | 170 -
 7 files changed, 400 insertions(+), 93 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
index d9bff3dc7ec..20b281332d0 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
@@ -420,17 +420,17 @@ class JacksonParser(
 case VALUE_STRING if parser.getTextLength < 1 && allowEmptyString =>
   dataType match {
 case FloatType | DoubleType | 

[spark] branch branch-3.5 updated: [SPARK-44940][SQL][3.5] Improve performance of JSON parsing when "spark.sql.json.enablePartialResults" is enabled

2023-09-04 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
 new 0dea7db3660 [SPARK-44940][SQL][3.5] Improve performance of JSON 
parsing when "spark.sql.json.enablePartialResults" is enabled
0dea7db3660 is described below

commit 0dea7db3660b9db7bbe075c31712e7119bfd1af7
Author: Ivan Sadikov 
AuthorDate: Mon Sep 4 14:37:32 2023 -0700

[SPARK-44940][SQL][3.5] Improve performance of JSON parsing when 
"spark.sql.json.enablePartialResults" is enabled

### What changes were proposed in this pull request?

Backport of https://github.com/apache/spark/pull/42667 to branch-3.5.

The PR improves JSON parsing when `spark.sql.json.enablePartialResults` is 
enabled:
- Fixes the issue when using nested arrays `ClassCastException: 
org.apache.spark.sql.catalyst.util.GenericArrayData cannot be cast to 
org.apache.spark.sql.catalyst.InternalRow`
- Improves parsing of the nested struct fields, e.g. `{"a1": "AAA", "a2": 
[{"f1": "", "f2": ""}], "a3": "id1", "a4": "XXX"}` used to be parsed as 
`|AAA|NULL|NULL|NULL|` and now is parsed as `|AAA|[{NULL, }]|id1|XXX|`.
- Improves performance of nested JSON parsing. The initial implementation 
would throw too many exceptions when multiple nested fields failed to parse. 
When the config is disabled, it is not a problem because the entire record is 
marked as NULL.

The internal benchmarks show the performance improvement from slowdown of 
over 160% to an improvement of 7-8% compared to the master branch when the flag 
is enabled. I will create a follow-up ticket to add a benchmark for this 
regression.

### Why are the changes needed?

Fixes some corner cases in JSON parsing and improves performance when 
`spark.sql.json.enablePartialResults` is enabled.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

I added tests to verify nested structs, maps, and arrays can be parsed 
without affecting the subsequent fields in the JSON. I also updated the 
existing tests when `spark.sql.json.enablePartialResults` is enabled because we 
parse more data now.

I added a benchmark to check performance.

Before the change (master, 
https://github.com/apache/spark/commit/a45a3a3d60cb97b107a177ad16bfe36372bc3e9b):
```
[info] OpenJDK 64-Bit Server VM 1.8.0_292-8u292-b10-0ubuntu1~18.04-b10 on 
Linux 5.4.0-1045-aws
[info] Intel(R) Xeon(R) Platinum 8375C CPU  2.90GHz
[info] Partial JSON results: Best Time(ms)   Avg 
Time(ms)   Stdev(ms)Rate(M/s)   Per Row(ns)   Relative
[info] 

[info] parse invalid JSON 9537   
9820 452  0.0  953651.6   1.0X
```

After the change (this PR):
```
OpenJDK 64-Bit Server VM 1.8.0_292-8u292-b10-0ubuntu1~18.04-b10 on Linux 
5.4.0-1045-aws
Intel(R) Xeon(R) Platinum 8375C CPU  2.90GHz
Partial JSON results: Best Time(ms)   Avg Time(ms)   
Stdev(ms)Rate(M/s)   Per Row(ns)   Relative


parse invalid JSON 3100   3106  
 6  0.0  309967.6   1.0X
```

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #42790 from sadikovi/SPARK-44940-3.5.

Authored-by: Ivan Sadikov 
Signed-off-by: Dongjoon Hyun 
---
 .../spark/sql/catalyst/json/JacksonParser.scala|  41 -
 .../sql/catalyst/util/BadRecordException.scala |  64 +++-
 .../spark/sql/errors/QueryExecutionErrors.scala|  12 +-
 sql/core/benchmarks/JsonBenchmark-results.txt  | 152 +-
 .../org/apache/spark/sql/JsonFunctionsSuite.scala  |  20 ++-
 .../execution/datasources/json/JsonBenchmark.scala |  28 
 .../sql/execution/datasources/json/JsonSuite.scala | 170 -
 7 files changed, 393 insertions(+), 94 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
index 388edb9024c..f14f70532e6 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
@@ -420,17 +420,17 @@ class JacksonParser(
 case VALUE_STRING if parser.getTextLength < 1 && allowEmptyString =>
   dataType match {
 case FloatType | DoubleType | 

[spark] branch master updated: [SPARK-45036][SQL] SPJ: Simplify the logic to handle partially clustered distribution

2023-09-04 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 9e2aafb1373 [SPARK-45036][SQL] SPJ: Simplify the logic to handle 
partially clustered distribution
9e2aafb1373 is described below

commit 9e2aafb13739f9c07f8218cd325c5532063b1a51
Author: Chao Sun 
AuthorDate: Mon Sep 4 14:05:14 2023 -0700

[SPARK-45036][SQL] SPJ: Simplify the logic to handle partially clustered 
distribution

### What changes were proposed in this pull request?

In SPJ, currently the logic to handle partially clustered distribution is a 
bit complicated. For instance, when the feature is eanbled (by enabling both 
`conf.v2BucketingPushPartValuesEnabled` and 
`conf.v2BucketingPartiallyClusteredDistributionEnabled`), Spark should postpone 
the combining of input splits until it is about to create an input RDD in 
`BatchScanExec`. To implement this, `groupPartitions` in 
`DataSourceV2ScanExecBase` currently takes the flag as input and has two 
differen [...]

This PR introduces a new field in `KeyGroupedPartitioning`, named 
`originalPartitionValues`, that is used to store the original partition values 
from input before splits combining  has been applied. The field is used when 
partially clustered distribution is enabled. With this, `groupPartitions` 
becomes easier to understand.

In addition, this also simplifies `BatchScanExec.inputRDD` by combining two 
branches where partially clustered distribution is not enabled.

### Why are the changes needed?

To simplify the current logic in the SPJ w.r.t partially clustered 
distribution.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Existing tests.

### Was this patch authored or co-authored using generative AI tooling?

Closes #42757 from sunchao/SPARK-45036.

Authored-by: Chao Sun 
Signed-off-by: Dongjoon Hyun 
---
 .../sql/catalyst/plans/physical/partitioning.scala |  35 +++---
 .../execution/datasources/v2/BatchScanExec.scala   | 117 +
 .../datasources/v2/DataSourceV2ScanExecBase.scala  |  65 +++-
 .../execution/exchange/EnsureRequirements.scala|   9 +-
 .../execution/exchange/ShuffleExchangeExec.scala   |   4 +-
 .../DistributionAndOrderingSuiteBase.scala |   6 +-
 .../connector/KeyGroupedPartitioningSuite.scala|   2 +-
 .../exchange/EnsureRequirementsSuite.scala |   2 +-
 8 files changed, 122 insertions(+), 118 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
index ce557422a08..0be4a61f275 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
@@ -312,26 +312,37 @@ case class HashPartitioning(expressions: Seq[Expression], 
numPartitions: Int)
  * Represents a partitioning where rows are split across partitions based on 
transforms defined
  * by `expressions`. `partitionValues`, if defined, should contain value of 
partition key(s) in
  * ascending order, after evaluated by the transforms in `expressions`, for 
each input partition.
- * In addition, its length must be the same as the number of input partitions 
(and thus is a 1-1
- * mapping). The `partitionValues` may contain duplicated partition values.
+ * In addition, its length must be the same as the number of Spark partitions 
(and thus is a 1-1
+ * mapping), and each row in `partitionValues` must be unique.
  *
- * For example, if `expressions` is `[years(ts_col)]`, then a valid value of 
`partitionValues` is
- * `[0, 1, 2]`, which represents 3 input partitions with distinct partition 
values. All rows
- * in each partition have the same value for column `ts_col` (which is of 
timestamp type), after
- * being applied by the `years` transform.
+ * The `originalPartitionValues`, on the other hand, are partition values from 
the original input
+ * splits returned by data sources. It may contain duplicated values.
  *
- * On the other hand, `[0, 0, 1]` is not a valid value for `partitionValues` 
since `0` is
- * duplicated twice.
+ * For example, if a data source reports partition transform expressions 
`[years(ts_col)]` with 4
+ * input splits whose corresponding partition values are `[0, 1, 2, 2]`, then 
the `expressions`
+ * in this case is `[years(ts_col)]`, while `partitionValues` is `[0, 1, 2]`, 
which
+ * represents 3 input partitions with distinct partition values. All rows in 
each partition have
+ * the same value for column `ts_col` (which is of timestamp type), after 
being applied by the
+ * `years` 

[spark] branch master updated: [SPARK-45067][BUILD] Upgrade slf4j to 2.0.9

2023-09-04 Thread srowen
This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 85d1c7f3a5d [SPARK-45067][BUILD] Upgrade slf4j to 2.0.9
85d1c7f3a5d is described below

commit 85d1c7f3a5dd0a9162d93b80812a193d8ccfef18
Author: yangjie01 
AuthorDate: Mon Sep 4 09:15:44 2023 -0500

[SPARK-45067][BUILD] Upgrade slf4j to 2.0.9

### What changes were proposed in this pull request?
This pr aims upgrade slf4j from 2.0.7 to 2.0.9

### Why are the changes needed?
The release notes as follows:

- https://www.slf4j.org/news.html#2.0.9

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Pass GitHub Actions

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #42796 from LuciferYang/SPARK-45067.

Authored-by: yangjie01 
Signed-off-by: Sean Owen 
---
 dev/deps/spark-deps-hadoop-3-hive-2.3 | 6 +++---
 pom.xml   | 2 +-
 2 files changed, 4 insertions(+), 4 deletions(-)

diff --git a/dev/deps/spark-deps-hadoop-3-hive-2.3 
b/dev/deps/spark-deps-hadoop-3-hive-2.3
index 59164c1f8f4..652127a9bb8 100644
--- a/dev/deps/spark-deps-hadoop-3-hive-2.3
+++ b/dev/deps/spark-deps-hadoop-3-hive-2.3
@@ -118,7 +118,7 @@ javassist/3.29.2-GA//javassist-3.29.2-GA.jar
 javax.jdo/3.2.0-m3//javax.jdo-3.2.0-m3.jar
 javolution/5.5.1//javolution-5.5.1.jar
 jaxb-runtime/2.3.2//jaxb-runtime-2.3.2.jar
-jcl-over-slf4j/2.0.7//jcl-over-slf4j-2.0.7.jar
+jcl-over-slf4j/2.0.9//jcl-over-slf4j-2.0.9.jar
 jdo-api/3.0.1//jdo-api-3.0.1.jar
 jdom2/2.0.6//jdom2-2.0.6.jar
 jersey-client/2.40//jersey-client-2.40.jar
@@ -141,7 +141,7 @@ 
json4s-jackson_2.12/3.7.0-M11//json4s-jackson_2.12-3.7.0-M11.jar
 json4s-scalap_2.12/3.7.0-M11//json4s-scalap_2.12-3.7.0-M11.jar
 jsr305/3.0.0//jsr305-3.0.0.jar
 jta/1.1//jta-1.1.jar
-jul-to-slf4j/2.0.7//jul-to-slf4j-2.0.7.jar
+jul-to-slf4j/2.0.9//jul-to-slf4j-2.0.9.jar
 kryo-shaded/4.0.2//kryo-shaded-4.0.2.jar
 kubernetes-client-api/6.8.1//kubernetes-client-api-6.8.1.jar
 kubernetes-client/6.8.1//kubernetes-client-6.8.1.jar
@@ -233,7 +233,7 @@ 
scala-parser-combinators_2.12/2.3.0//scala-parser-combinators_2.12-2.3.0.jar
 scala-reflect/2.12.18//scala-reflect-2.12.18.jar
 scala-xml_2.12/2.2.0//scala-xml_2.12-2.2.0.jar
 shims/0.9.45//shims-0.9.45.jar
-slf4j-api/2.0.7//slf4j-api-2.0.7.jar
+slf4j-api/2.0.9//slf4j-api-2.0.9.jar
 snakeyaml-engine/2.6//snakeyaml-engine-2.6.jar
 snakeyaml/2.0//snakeyaml-2.0.jar
 snappy-java/1.1.10.3//snappy-java-1.1.10.3.jar
diff --git a/pom.xml b/pom.xml
index efd1c6ffdb9..a61d603fe1c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -119,7 +119,7 @@
 3.1.0
 spark
 9.5
-2.0.7
+2.0.9
 2.20.0
 
 3.3.6


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-44890][BUILD] Update miswritten remarks

2023-09-04 Thread srowen
This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new ba1c2f3b383 [SPARK-44890][BUILD] Update miswritten remarks
ba1c2f3b383 is described below

commit ba1c2f3b38396c01739375d6e83ac84b581d951e
Author: chenyu-opensource <119398199+chenyu-opensou...@users.noreply.github.com>
AuthorDate: Mon Sep 4 09:12:33 2023 -0500

[SPARK-44890][BUILD] Update miswritten remarks

### What changes were proposed in this pull request?

The PR updates miswritten remarks in pom.xml

### Why are the changes needed?

More accurate and standardized

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

It doesn't need to. It is annotation information that does not affect 
actual operation

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #42598 from chenyu-opensource/master.

Authored-by: chenyu-opensource 
<119398199+chenyu-opensou...@users.noreply.github.com>
Signed-off-by: Sean Owen 
---
 pom.xml | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git a/pom.xml b/pom.xml
index 8edc3fd550c..efd1c6ffdb9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -153,7 +153,7 @@
 2.5.1
 2.0.8
 
 4.2.19
@@ -175,7 +175,7 @@
 2.12.18
 2.12
 2.2.0
-   
+   
 4.8.0
 false
 2.16.0
@@ -204,7 +204,7 @@
 3.1.9
 
 2.40
 2.12.5


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.5 updated: [SPARK-45042][BUILD][3.5] Upgrade jetty to 9.4.52.v20230823

2023-09-04 Thread srowen
This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
 new 950b2f29105 [SPARK-45042][BUILD][3.5] Upgrade jetty to 9.4.52.v20230823
950b2f29105 is described below

commit 950b2f29105cd66355eef10503a93d678087c79e
Author: panbingkun 
AuthorDate: Mon Sep 4 09:01:50 2023 -0500

[SPARK-45042][BUILD][3.5] Upgrade jetty to 9.4.52.v20230823

### What changes were proposed in this pull request?
The pr aims to Upgrade jetty from 9.4.51.v20230217 to 9.4.52.v20230823. 
(Backport to Spark 3.5.0)

### Why are the changes needed?
- This is a release of the 
https://github.com/eclipse/jetty.project/issues/7958 that was sponsored by a 
[support contract from Webtide.com](mailto:saleswebtide.com)

- The newest version fix a possible security issue:
   This release provides a workaround for Security Advisory 
https://github.com/advisories/GHSA-58qw-p7qm-5rvh

- The release note as follows:
   
https://github.com/eclipse/jetty.project/releases/tag/jetty-9.4.52.v20230823

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Pass GA.

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes #42795 from panbingkun/branch-3.5_SPARK-45042.

Authored-by: panbingkun 
Signed-off-by: Sean Owen 
---
 dev/deps/spark-deps-hadoop-3-hive-2.3 | 4 ++--
 pom.xml   | 2 +-
 2 files changed, 3 insertions(+), 3 deletions(-)

diff --git a/dev/deps/spark-deps-hadoop-3-hive-2.3 
b/dev/deps/spark-deps-hadoop-3-hive-2.3
index b6aba589d5f..1d02f8dba56 100644
--- a/dev/deps/spark-deps-hadoop-3-hive-2.3
+++ b/dev/deps/spark-deps-hadoop-3-hive-2.3
@@ -130,8 +130,8 @@ 
jersey-container-servlet/2.40//jersey-container-servlet-2.40.jar
 jersey-hk2/2.40//jersey-hk2-2.40.jar
 jersey-server/2.40//jersey-server-2.40.jar
 jettison/1.1//jettison-1.1.jar
-jetty-util-ajax/9.4.51.v20230217//jetty-util-ajax-9.4.51.v20230217.jar
-jetty-util/9.4.51.v20230217//jetty-util-9.4.51.v20230217.jar
+jetty-util-ajax/9.4.52.v20230823//jetty-util-ajax-9.4.52.v20230823.jar
+jetty-util/9.4.52.v20230823//jetty-util-9.4.52.v20230823.jar
 jline/2.14.6//jline-2.14.6.jar
 joda-time/2.12.5//joda-time-2.12.5.jar
 jodd-core/3.5.2//jodd-core-3.5.2.jar
diff --git a/pom.xml b/pom.xml
index 154ca4005f6..8fc4b89a78c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -143,7 +143,7 @@
 1.13.1
 1.9.1
 shaded-protobuf
-9.4.51.v20230217
+9.4.52.v20230823
 4.0.3
 0.10.0
 

[spark] branch branch-3.4 updated: [SPARK-44846][SQL] Convert the lower redundant Aggregate to Project in RemoveRedundantAggregates

2023-09-04 Thread yumwang
This is an automated email from the ASF dual-hosted git repository.

yumwang pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
 new 5a1c6e6ffe2 [SPARK-44846][SQL] Convert the lower redundant Aggregate 
to Project in RemoveRedundantAggregates
5a1c6e6ffe2 is described below

commit 5a1c6e6ffe244461d23de98ddb317904db19fc4b
Author: zml1206 
AuthorDate: Mon Sep 4 20:23:39 2023 +0800

[SPARK-44846][SQL] Convert the lower redundant Aggregate to Project in 
RemoveRedundantAggregates

### What changes were proposed in this pull request?
This PR provides a safe way to remove a redundant `Aggregate` in rule 
`RemoveRedundantAggregates`. Just convert the lower redundant `Aggregate` to 
`Project`.

### Why are the changes needed?
The aggregate contains complex grouping expressions after 
`RemoveRedundantAggregates`, if `aggregateExpressions` has (if / case) 
branches, it is possible that `groupingExpressions` is no longer a 
subexpression of `aggregateExpressions` after execute 
`PushFoldableIntoBranches` rule, Then cause `boundReference` error.
For example
```
SELECT c * 2 AS d
FROM (
 SELECT if(b > 1, 1, b) AS c
 FROM (
  SELECT if(a < 0, 0, a) AS b
  FROM VALUES (-1), (1), (2) AS t1(a)
  ) t2
 GROUP BY b
 ) t3
GROUP BY c
```
Before pr
```
== Optimized Logical Plan ==
Aggregate [if ((b#0 > 1)) 1 else b#0], [if ((b#0 > 1)) 2 else (b#0 * 2) AS 
d#2]
+- Project [if ((a#3 < 0)) 0 else a#3 AS b#0]
   +- LocalRelation [a#3]
```
```
== Error ==
Couldn't find b#0 in [if ((b#0 > 1)) 1 else b#0#7]
java.lang.IllegalStateException: Couldn't find b#0 in [if ((b#0 > 1)) 1 
else b#0#7]
at 
org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:80)
at 
org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:73)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
at 
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$3(TreeNode.scala:466)
at 
org.apache.spark.sql.catalyst.trees.BinaryLike.mapChildren(TreeNode.scala:1241)
at 
org.apache.spark.sql.catalyst.trees.BinaryLike.mapChildren$(TreeNode.scala:1240)
at 
org.apache.spark.sql.catalyst.expressions.BinaryExpression.mapChildren(Expression.scala:653)
..
```
After pr
```
== Optimized Logical Plan ==
Aggregate [c#1], [(c#1 * 2) AS d#2]
+- Project [if ((b#0 > 1)) 1 else b#0 AS c#1]
   +- Project [if ((a#3 < 0)) 0 else a#3 AS b#0]
  +- LocalRelation [a#3]
```
### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
UT

Closes #42633 from zml1206/SPARK-44846-2.

Authored-by: zml1206 
Signed-off-by: Yuming Wang 
(cherry picked from commit 32a87f03da7eef41161a5a7a3aba4a48e0421912)
Signed-off-by: Yuming Wang 
---
 .../optimizer/RemoveRedundantAggregates.scala   | 19 ++-
 .../optimizer/RemoveRedundantAggregatesSuite.scala  | 21 -
 .../test/resources/sql-tests/inputs/group-by.sql| 13 +
 .../resources/sql-tests/results/group-by.sql.out| 18 ++
 4 files changed, 45 insertions(+), 26 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAggregates.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAggregates.scala
index 2104bce3711..0c3d5bcf01a 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAggregates.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAggregates.scala
@@ -17,7 +17,6 @@
 
 package org.apache.spark.sql.catalyst.optimizer
 
-import org.apache.spark.sql.catalyst.analysis.PullOutNondeterministic
 import org.apache.spark.sql.catalyst.expressions.{AliasHelper, AttributeSet, 
ExpressionSet}
 import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
 import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan, 
Project}
@@ -32,22 +31,8 @@ object RemoveRedundantAggregates extends Rule[LogicalPlan] 
with AliasHelper {
   def apply(plan: LogicalPlan): LogicalPlan = plan.transformUpWithPruning(
 _.containsPattern(AGGREGATE), ruleId) {
 case upper @ Aggregate(_, 

[spark] branch branch-3.5 updated: [SPARK-44846][SQL] Convert the lower redundant Aggregate to Project in RemoveRedundantAggregates

2023-09-04 Thread yumwang
This is an automated email from the ASF dual-hosted git repository.

yumwang pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
 new 5c801fc1171 [SPARK-44846][SQL] Convert the lower redundant Aggregate 
to Project in RemoveRedundantAggregates
5c801fc1171 is described below

commit 5c801fc11718293d76c39c9d79d943fec9103ae4
Author: zml1206 
AuthorDate: Mon Sep 4 20:23:39 2023 +0800

[SPARK-44846][SQL] Convert the lower redundant Aggregate to Project in 
RemoveRedundantAggregates

### What changes were proposed in this pull request?
This PR provides a safe way to remove a redundant `Aggregate` in rule 
`RemoveRedundantAggregates`. Just convert the lower redundant `Aggregate` to 
`Project`.

### Why are the changes needed?
The aggregate contains complex grouping expressions after 
`RemoveRedundantAggregates`, if `aggregateExpressions` has (if / case) 
branches, it is possible that `groupingExpressions` is no longer a 
subexpression of `aggregateExpressions` after execute 
`PushFoldableIntoBranches` rule, Then cause `boundReference` error.
For example
```
SELECT c * 2 AS d
FROM (
 SELECT if(b > 1, 1, b) AS c
 FROM (
  SELECT if(a < 0, 0, a) AS b
  FROM VALUES (-1), (1), (2) AS t1(a)
  ) t2
 GROUP BY b
 ) t3
GROUP BY c
```
Before pr
```
== Optimized Logical Plan ==
Aggregate [if ((b#0 > 1)) 1 else b#0], [if ((b#0 > 1)) 2 else (b#0 * 2) AS 
d#2]
+- Project [if ((a#3 < 0)) 0 else a#3 AS b#0]
   +- LocalRelation [a#3]
```
```
== Error ==
Couldn't find b#0 in [if ((b#0 > 1)) 1 else b#0#7]
java.lang.IllegalStateException: Couldn't find b#0 in [if ((b#0 > 1)) 1 
else b#0#7]
at 
org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:80)
at 
org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:73)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
at 
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$3(TreeNode.scala:466)
at 
org.apache.spark.sql.catalyst.trees.BinaryLike.mapChildren(TreeNode.scala:1241)
at 
org.apache.spark.sql.catalyst.trees.BinaryLike.mapChildren$(TreeNode.scala:1240)
at 
org.apache.spark.sql.catalyst.expressions.BinaryExpression.mapChildren(Expression.scala:653)
..
```
After pr
```
== Optimized Logical Plan ==
Aggregate [c#1], [(c#1 * 2) AS d#2]
+- Project [if ((b#0 > 1)) 1 else b#0 AS c#1]
   +- Project [if ((a#3 < 0)) 0 else a#3 AS b#0]
  +- LocalRelation [a#3]
```
### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
UT

Closes #42633 from zml1206/SPARK-44846-2.

Authored-by: zml1206 
Signed-off-by: Yuming Wang 
(cherry picked from commit 32a87f03da7eef41161a5a7a3aba4a48e0421912)
Signed-off-by: Yuming Wang 
---
 .../optimizer/RemoveRedundantAggregates.scala   | 19 ++-
 .../optimizer/RemoveRedundantAggregatesSuite.scala  | 21 -
 .../sql-tests/analyzer-results/group-by.sql.out | 21 +
 .../test/resources/sql-tests/inputs/group-by.sql| 13 +
 .../resources/sql-tests/results/group-by.sql.out| 18 ++
 5 files changed, 66 insertions(+), 26 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAggregates.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAggregates.scala
index 93f3557a8c8..badf4065f5f 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAggregates.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAggregates.scala
@@ -17,7 +17,6 @@
 
 package org.apache.spark.sql.catalyst.optimizer
 
-import org.apache.spark.sql.catalyst.analysis.PullOutNondeterministic
 import org.apache.spark.sql.catalyst.expressions.{AliasHelper, AttributeSet, 
ExpressionSet}
 import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
 import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan, 
Project}
@@ -32,22 +31,8 @@ object RemoveRedundantAggregates extends Rule[LogicalPlan] 
with AliasHelper {
   def apply(plan: LogicalPlan): LogicalPlan = 

[spark] branch master updated: [SPARK-44846][SQL] Convert the lower redundant Aggregate to Project in RemoveRedundantAggregates

2023-09-04 Thread yumwang
This is an automated email from the ASF dual-hosted git repository.

yumwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 32a87f03da7 [SPARK-44846][SQL] Convert the lower redundant Aggregate 
to Project in RemoveRedundantAggregates
32a87f03da7 is described below

commit 32a87f03da7eef41161a5a7a3aba4a48e0421912
Author: zml1206 
AuthorDate: Mon Sep 4 20:23:39 2023 +0800

[SPARK-44846][SQL] Convert the lower redundant Aggregate to Project in 
RemoveRedundantAggregates

### What changes were proposed in this pull request?
This PR provides a safe way to remove a redundant `Aggregate` in rule 
`RemoveRedundantAggregates`. Just convert the lower redundant `Aggregate` to 
`Project`.

### Why are the changes needed?
The aggregate contains complex grouping expressions after 
`RemoveRedundantAggregates`, if `aggregateExpressions` has (if / case) 
branches, it is possible that `groupingExpressions` is no longer a 
subexpression of `aggregateExpressions` after execute 
`PushFoldableIntoBranches` rule, Then cause `boundReference` error.
For example
```
SELECT c * 2 AS d
FROM (
 SELECT if(b > 1, 1, b) AS c
 FROM (
  SELECT if(a < 0, 0, a) AS b
  FROM VALUES (-1), (1), (2) AS t1(a)
  ) t2
 GROUP BY b
 ) t3
GROUP BY c
```
Before pr
```
== Optimized Logical Plan ==
Aggregate [if ((b#0 > 1)) 1 else b#0], [if ((b#0 > 1)) 2 else (b#0 * 2) AS 
d#2]
+- Project [if ((a#3 < 0)) 0 else a#3 AS b#0]
   +- LocalRelation [a#3]
```
```
== Error ==
Couldn't find b#0 in [if ((b#0 > 1)) 1 else b#0#7]
java.lang.IllegalStateException: Couldn't find b#0 in [if ((b#0 > 1)) 1 
else b#0#7]
at 
org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:80)
at 
org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:73)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
at 
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$3(TreeNode.scala:466)
at 
org.apache.spark.sql.catalyst.trees.BinaryLike.mapChildren(TreeNode.scala:1241)
at 
org.apache.spark.sql.catalyst.trees.BinaryLike.mapChildren$(TreeNode.scala:1240)
at 
org.apache.spark.sql.catalyst.expressions.BinaryExpression.mapChildren(Expression.scala:653)
..
```
After pr
```
== Optimized Logical Plan ==
Aggregate [c#1], [(c#1 * 2) AS d#2]
+- Project [if ((b#0 > 1)) 1 else b#0 AS c#1]
   +- Project [if ((a#3 < 0)) 0 else a#3 AS b#0]
  +- LocalRelation [a#3]
```
### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
UT

Closes #42633 from zml1206/SPARK-44846-2.

Authored-by: zml1206 
Signed-off-by: Yuming Wang 
---
 .../optimizer/RemoveRedundantAggregates.scala   | 19 ++-
 .../optimizer/RemoveRedundantAggregatesSuite.scala  | 21 -
 .../sql-tests/analyzer-results/group-by.sql.out | 21 +
 .../test/resources/sql-tests/inputs/group-by.sql| 13 +
 .../resources/sql-tests/results/group-by.sql.out| 18 ++
 5 files changed, 66 insertions(+), 26 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAggregates.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAggregates.scala
index 93f3557a8c8..badf4065f5f 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAggregates.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAggregates.scala
@@ -17,7 +17,6 @@
 
 package org.apache.spark.sql.catalyst.optimizer
 
-import org.apache.spark.sql.catalyst.analysis.PullOutNondeterministic
 import org.apache.spark.sql.catalyst.expressions.{AliasHelper, AttributeSet, 
ExpressionSet}
 import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
 import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan, 
Project}
@@ -32,22 +31,8 @@ object RemoveRedundantAggregates extends Rule[LogicalPlan] 
with AliasHelper {
   def apply(plan: LogicalPlan): LogicalPlan = plan.transformUpWithPruning(
 _.containsPattern(AGGREGATE), ruleId) {
 case upper @ Aggregate(_, _, lower: Aggregate) if 

[spark] branch master updated (b0b7835bee2 -> 416207659aa)

2023-09-04 Thread maxgekk
This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from b0b7835bee2 [SPARK-45059][CONNECT][PYTHON] Add `try_reflect` functions 
to Scala and Python
 add 416207659aa [SPARK-45033][SQL] Support maps by parameterized `sql()`

No new revisions were added by this update.

Summary of changes:
 .../spark/sql/catalyst/analysis/parameters.scala   | 15 --
 .../org/apache/spark/sql/ParametersSuite.scala | 62 +-
 2 files changed, 72 insertions(+), 5 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-45059][CONNECT][PYTHON] Add `try_reflect` functions to Scala and Python

2023-09-04 Thread ruifengz
This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new b0b7835bee2 [SPARK-45059][CONNECT][PYTHON] Add `try_reflect` functions 
to Scala and Python
b0b7835bee2 is described below

commit b0b7835bee2837c6e2875547aca259e02d2b0af7
Author: Jia Fan 
AuthorDate: Mon Sep 4 16:28:08 2023 +0800

[SPARK-45059][CONNECT][PYTHON] Add `try_reflect` functions to Scala and 
Python

### What changes were proposed in this pull request?
Add new `try_reflect` funtion to python and connect.

### Why are the changes needed?
for parity

### Does this PR introduce _any_ user-facing change?
Yes, new function

### How was this patch tested?
add new test.

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #42783 from Hisoka-X/SPARK-45059_try_reflect_to_python.

Authored-by: Jia Fan 
Signed-off-by: Ruifeng Zheng 
---
 .../scala/org/apache/spark/sql/functions.scala |   9 ++
 .../apache/spark/sql/PlanGenerationTestSuite.scala |   4 +++
 .../explain-results/function_try_reflect.explain   |   2 ++
 .../query-tests/queries/function_try_reflect.json  |  33 +
 .../queries/function_try_reflect.proto.bin | Bin 0 -> 216 bytes
 .../source/reference/pyspark.sql/functions.rst |   1 +
 python/pyspark/sql/connect/functions.py|   7 +
 python/pyspark/sql/functions.py|  29 ++
 python/pyspark/sql/tests/test_functions.py |   2 --
 .../scala/org/apache/spark/sql/functions.scala |   3 +-
 10 files changed, 87 insertions(+), 3 deletions(-)

diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala
index baafdd4e172..9ead800ace7 100644
--- 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala
+++ 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala
@@ -3629,6 +3629,15 @@ object functions {
*/
   def java_method(cols: Column*): Column = Column.fn("java_method", cols: _*)
 
+  /**
+   * This is a special version of `reflect` that performs the same operation, 
but returns a NULL
+   * value instead of raising an error if the invoke method thrown exception.
+   *
+   * @group misc_funcs
+   * @since 4.0.0
+   */
+  def try_reflect(cols: Column*): Column = Column.fn("try_reflect", cols: _*)
+
   /**
* Returns the Spark version. The string contains 2 fields, the first being 
a release version
* and the second being a git revision.
diff --git 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
index c457f269213..aa15fbd75ff 100644
--- 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
+++ 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
@@ -2864,6 +2864,10 @@ class PlanGenerationTestSuite
 fn.java_method(lit("java.util.UUID"), lit("fromString"), fn.col("g"))
   }
 
+  functionTest("try_reflect") {
+fn.try_reflect(lit("java.util.UUID"), lit("fromString"), fn.col("g"))
+  }
+
   functionTest("typeof") {
 fn.typeof(fn.col("g"))
   }
diff --git 
a/connector/connect/common/src/test/resources/query-tests/explain-results/function_try_reflect.explain
 
b/connector/connect/common/src/test/resources/query-tests/explain-results/function_try_reflect.explain
new file mode 100644
index 000..5c68f3bf2c1
--- /dev/null
+++ 
b/connector/connect/common/src/test/resources/query-tests/explain-results/function_try_reflect.explain
@@ -0,0 +1,2 @@
+Project [reflect(java.util.UUID, fromString, g#0, false) AS 
try_reflect(java.util.UUID, fromString, g)#0]
++- LocalRelation , [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git 
a/connector/connect/common/src/test/resources/query-tests/queries/function_try_reflect.json
 
b/connector/connect/common/src/test/resources/query-tests/queries/function_try_reflect.json
new file mode 100644
index 000..de3fae90c2c
--- /dev/null
+++ 
b/connector/connect/common/src/test/resources/query-tests/queries/function_try_reflect.json
@@ -0,0 +1,33 @@
+{
+  "common": {
+"planId": "1"
+  },
+  "project": {
+"input": {
+  "common": {
+"planId": "0"
+  },
+  "localRelation": {
+"schema": 
"struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e"
+  }
+},
+"expressions": [{
+  "unresolvedFunction": {
+"functionName": 

[spark] branch branch-3.5 updated: [SPARK-45052][SQL][PYTHON][CONNECT][3.5] Make function aliases output column name consistent with SQL

2023-09-04 Thread ruifengz
This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
 new 6112d78cba2 [SPARK-45052][SQL][PYTHON][CONNECT][3.5] Make function 
aliases output column name consistent with SQL
6112d78cba2 is described below

commit 6112d78cba20fd2e9aa298190371dd52205dc762
Author: Ruifeng Zheng 
AuthorDate: Mon Sep 4 16:24:43 2023 +0800

[SPARK-45052][SQL][PYTHON][CONNECT][3.5] Make function aliases output 
column name consistent with SQL

### What changes were proposed in this pull request?
backport https://github.com/apache/spark/pull/42775 to 3.5

### Why are the changes needed?
to make `func(col)` consistent with `expr(func(col))`

### Does this PR introduce _any_ user-facing change?
yes

### How was this patch tested?
ci

### Was this patch authored or co-authored using generative AI tooling?
no

Closes #42786 from zhengruifeng/try_column_name_35.

Authored-by: Ruifeng Zheng 
Signed-off-by: Ruifeng Zheng 
---
 .../scala/org/apache/spark/sql/functions.scala |  12 +-
 .../query-tests/explain-results/describe.explain   |   2 +-
 .../explain-results/function_ceiling.explain   |   2 +-
 .../explain-results/function_ceiling_scale.explain |   2 +-
 .../explain-results/function_printf.explain|   2 +-
 .../explain-results/function_sign.explain  |   2 +-
 .../explain-results/function_std.explain   |   2 +-
 .../query-tests/queries/function_ceiling.json  |   2 +-
 .../query-tests/queries/function_ceiling.proto.bin | Bin 173 -> 176 bytes
 .../queries/function_ceiling_scale.json|   2 +-
 .../queries/function_ceiling_scale.proto.bin   | Bin 179 -> 182 bytes
 .../query-tests/queries/function_printf.json   |   2 +-
 .../query-tests/queries/function_printf.proto.bin  | Bin 196 -> 189 bytes
 .../query-tests/queries/function_sign.json |   2 +-
 .../query-tests/queries/function_sign.proto.bin| Bin 175 -> 173 bytes
 .../query-tests/queries/function_std.json  |   2 +-
 .../query-tests/queries/function_std.proto.bin | Bin 175 -> 172 bytes
 python/pyspark/sql/connect/functions.py|  26 +-
 python/pyspark/sql/functions.py| 714 +++--
 .../scala/org/apache/spark/sql/functions.scala | 202 +++---
 20 files changed, 628 insertions(+), 348 deletions(-)

diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala
index fa8c5782e06..fe992ae6740 100644
--- 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala
+++ 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala
@@ -987,7 +987,7 @@ object functions {
* @group agg_funcs
* @since 3.5.0
*/
-  def std(e: Column): Column = stddev(e)
+  def std(e: Column): Column = Column.fn("std", e)
 
   /**
* Aggregate function: alias for `stddev_samp`.
@@ -2337,7 +2337,7 @@ object functions {
* @group math_funcs
* @since 3.5.0
*/
-  def ceiling(e: Column, scale: Column): Column = ceil(e, scale)
+  def ceiling(e: Column, scale: Column): Column = Column.fn("ceiling", e, 
scale)
 
   /**
* Computes the ceiling of the given value of `e` to 0 decimal places.
@@ -2345,7 +2345,7 @@ object functions {
* @group math_funcs
* @since 3.5.0
*/
-  def ceiling(e: Column): Column = ceil(e)
+  def ceiling(e: Column): Column = Column.fn("ceiling", e)
 
   /**
* Convert a number in a string column from one base to another.
@@ -2800,7 +2800,7 @@ object functions {
* @group math_funcs
* @since 3.5.0
*/
-  def power(l: Column, r: Column): Column = pow(l, r)
+  def power(l: Column, r: Column): Column = Column.fn("power", l, r)
 
   /**
* Returns the positive value of dividend mod divisor.
@@ -2937,7 +2937,7 @@ object functions {
* @group math_funcs
* @since 3.5.0
*/
-  def sign(e: Column): Column = signum(e)
+  def sign(e: Column): Column = Column.fn("sign", e)
 
   /**
* Computes the signum of the given value.
@@ -4420,7 +4420,7 @@ object functions {
* @since 3.5.0
*/
   def printf(format: Column, arguments: Column*): Column =
-Column.fn("format_string", lit(format) +: arguments: _*)
+Column.fn("printf", (format +: arguments): _*)
 
   /**
* Decodes a `str` in 'application/x-www-form-urlencoded' format using a 
specific encoding
diff --git 
a/connector/connect/common/src/test/resources/query-tests/explain-results/describe.explain
 
b/connector/connect/common/src/test/resources/query-tests/explain-results/describe.explain
index f205f7ef7a1..b203f715c71 100644
--- 
a/connector/connect/common/src/test/resources/query-tests/explain-results/describe.explain
+++ 

[spark] branch master updated (f2a6c97d718 -> d03ebced0ef)

2023-09-04 Thread maxgekk
This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from f2a6c97d718 [SPARK-44876][PYTHON][FOLLOWUP] Fix Arrow-optimized Python 
UDF to delay wrapping the function with fail_on_stopiteration
 add d03ebced0ef [SPARK-45060][SQL] Fix an internal error from 
`to_char()`on `NULL` format

No new revisions were added by this update.

Summary of changes:
 common/utils/src/main/resources/error/error-classes.json   |  5 +
 ...error-conditions-invalid-parameter-value-error-class.md |  4 
 .../sql/catalyst/expressions/numberFormatExpressions.scala |  8 ++--
 .../apache/spark/sql/errors/QueryCompilationErrors.scala   |  8 
 .../scala/org/apache/spark/sql/StringFunctionsSuite.scala  | 14 ++
 5 files changed, 37 insertions(+), 2 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-44876][PYTHON][FOLLOWUP] Fix Arrow-optimized Python UDF to delay wrapping the function with fail_on_stopiteration

2023-09-04 Thread ruifengz
This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new f2a6c97d718 [SPARK-44876][PYTHON][FOLLOWUP] Fix Arrow-optimized Python 
UDF to delay wrapping the function with fail_on_stopiteration
f2a6c97d718 is described below

commit f2a6c97d718839896343feaa520396f328f2f866
Author: Takuya UESHIN 
AuthorDate: Mon Sep 4 15:24:33 2023 +0800

[SPARK-44876][PYTHON][FOLLOWUP] Fix Arrow-optimized Python UDF to delay 
wrapping the function with fail_on_stopiteration

### What changes were proposed in this pull request?

Fixes Arrow-optimized Python UDF to delay wrapping the function with 
`fail_on_stopiteration`.

Also removed unnecessary verification `verify_result_type`.

### Why are the changes needed?

For Arrow-optimized Python UDF, `fail_on_stopiteration` can be applied to 
only the wrapped function to avoid unnecessary overhead.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Added the related test.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #42784 from ueshin/issues/SPARK-44876/fail_on_stopiteration.

Authored-by: Takuya UESHIN 
Signed-off-by: Ruifeng Zheng 
---
 python/pyspark/sql/tests/test_udf.py | 15 +++
 python/pyspark/worker.py | 22 ++
 2 files changed, 21 insertions(+), 16 deletions(-)

diff --git a/python/pyspark/sql/tests/test_udf.py 
b/python/pyspark/sql/tests/test_udf.py
index 32ea05bd00a..1f895b1780b 100644
--- a/python/pyspark/sql/tests/test_udf.py
+++ b/python/pyspark/sql/tests/test_udf.py
@@ -1005,6 +1005,21 @@ class BaseUDFTestsMixin(object):
 with self.subTest(with_b=True, query_no=i):
 assertDataFrameEqual(df, [Row(0), Row(101)])
 
+def test_raise_stop_iteration(self):
+@udf("int")
+def test_udf(a):
+if a < 5:
+return a
+else:
+raise StopIteration()
+
+assertDataFrameEqual(
+self.spark.range(5).select(test_udf(col("id"))), [Row(i) for i in 
range(5)]
+)
+
+with self.assertRaisesRegex(PythonException, "StopIteration"):
+self.spark.range(10).select(test_udf(col("id"))).show()
+
 
 class UDFTests(BaseUDFTestsMixin, ReusedSQLTestCase):
 @classmethod
diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py
index fff99f1de3d..92bc622775b 100644
--- a/python/pyspark/worker.py
+++ b/python/pyspark/worker.py
@@ -139,6 +139,7 @@ def wrap_arrow_batch_udf(f, return_type):
 elif type(return_type) == BinaryType:
 result_func = lambda r: bytes(r) if r is not None else r  # noqa: E731
 
+@fail_on_stopiteration
 def evaluate(*args: pd.Series, **kwargs: pd.Series) -> pd.Series:
 keys = list(kwargs.keys())
 len_args = len(args)
@@ -151,18 +152,6 @@ def wrap_arrow_batch_udf(f, return_type):
 ]
 )
 
-def verify_result_type(result):
-if not hasattr(result, "__len__"):
-pd_type = "pandas.DataFrame" if type(return_type) == StructType 
else "pandas.Series"
-raise PySparkTypeError(
-error_class="UDF_RETURN_TYPE",
-message_parameters={
-"expected": pd_type,
-"actual": type(result).__name__,
-},
-)
-return result
-
 def verify_result_length(result, length):
 if len(result) != length:
 raise PySparkRuntimeError(
@@ -175,9 +164,7 @@ def wrap_arrow_batch_udf(f, return_type):
 return result
 
 return lambda *a, **kw: (
-verify_result_length(
-verify_result_type(evaluate(*a, **kw)), len((list(a) + 
list(kw.values()))[0])
-),
+verify_result_length(evaluate(*a, **kw), len((list(a) + 
list(kw.values()))[0])),
 arrow_return_type,
 )
 
@@ -562,7 +549,10 @@ def read_single_udf(pickleSer, infile, eval_type, 
runner_conf, udf_index):
 else:
 chained_func = chain(chained_func, f)
 
-if eval_type == PythonEvalType.SQL_SCALAR_PANDAS_ITER_UDF:
+if eval_type in (
+PythonEvalType.SQL_SCALAR_PANDAS_ITER_UDF,
+PythonEvalType.SQL_ARROW_BATCHED_UDF,
+):
 func = chained_func
 else:
 # make sure StopIteration's raised in the user code are not ignored


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.5 updated: [SPARK-44876][PYTHON][FOLLOWUP][3.5] Fix Arrow-optimized Python UDF to delay wrapping the function with fail_on_stopiteration

2023-09-04 Thread ruifengz
This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
 new fe3a20a5e23 [SPARK-44876][PYTHON][FOLLOWUP][3.5] Fix Arrow-optimized 
Python UDF to delay wrapping the function with fail_on_stopiteration
fe3a20a5e23 is described below

commit fe3a20a5e231fd151b141e72ea8a1090647a
Author: Takuya UESHIN 
AuthorDate: Mon Sep 4 15:25:33 2023 +0800

[SPARK-44876][PYTHON][FOLLOWUP][3.5] Fix Arrow-optimized Python UDF to 
delay wrapping the function with fail_on_stopiteration

### What changes were proposed in this pull request?

This is a backport of https://github.com/apache/spark/pull/42784.

Fixes Arrow-optimized Python UDF to delay wrapping the function with 
`fail_on_stopiteration`.

Also removed unnecessary verification `verify_result_type`.

### Why are the changes needed?

For Arrow-optimized Python UDF, `fail_on_stopiteration` can be applied to 
only the wrapped function to avoid unnecessary overhead.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Added the related test.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #42785 from ueshin/issues/SPARK-44876/3.5/fail_on_stopiteration.

Authored-by: Takuya UESHIN 
Signed-off-by: Ruifeng Zheng 
---
 python/pyspark/sql/tests/test_udf.py | 21 ++---
 python/pyspark/worker.py | 22 +++---
 2 files changed, 25 insertions(+), 18 deletions(-)

diff --git a/python/pyspark/sql/tests/test_udf.py 
b/python/pyspark/sql/tests/test_udf.py
index 239ff27813b..2f8c1cd2136 100644
--- a/python/pyspark/sql/tests/test_udf.py
+++ b/python/pyspark/sql/tests/test_udf.py
@@ -24,7 +24,7 @@ import datetime
 
 from pyspark import SparkContext, SQLContext
 from pyspark.sql import SparkSession, Column, Row
-from pyspark.sql.functions import udf, assert_true, lit, rand
+from pyspark.sql.functions import col, udf, assert_true, lit, rand
 from pyspark.sql.udf import UserDefinedFunction
 from pyspark.sql.types import (
 StringType,
@@ -38,9 +38,9 @@ from pyspark.sql.types import (
 TimestampNTZType,
 DayTimeIntervalType,
 )
-from pyspark.errors import AnalysisException, PySparkTypeError
+from pyspark.errors import AnalysisException, PythonException, PySparkTypeError
 from pyspark.testing.sqlutils import ReusedSQLTestCase, test_compiled, 
test_not_compiled_message
-from pyspark.testing.utils import QuietTest
+from pyspark.testing.utils import QuietTest, assertDataFrameEqual
 
 
 class BaseUDFTestsMixin(object):
@@ -898,6 +898,21 @@ class BaseUDFTestsMixin(object):
 self.assertEquals(row[1], {"a": "b"})
 self.assertEquals(row[2], Row(col1=1, col2=2))
 
+def test_raise_stop_iteration(self):
+@udf("int")
+def test_udf(a):
+if a < 5:
+return a
+else:
+raise StopIteration()
+
+assertDataFrameEqual(
+self.spark.range(5).select(test_udf(col("id"))), [Row(i) for i in 
range(5)]
+)
+
+with self.assertRaisesRegex(PythonException, "StopIteration"):
+self.spark.range(10).select(test_udf(col("id"))).show()
+
 
 class UDFTests(BaseUDFTestsMixin, ReusedSQLTestCase):
 @classmethod
diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py
index edbfad4a5dc..d2ea18c45c9 100644
--- a/python/pyspark/worker.py
+++ b/python/pyspark/worker.py
@@ -154,20 +154,9 @@ def wrap_arrow_batch_udf(f, return_type):
 elif type(return_type) == BinaryType:
 result_func = lambda r: bytes(r) if r is not None else r  # noqa: E731
 
+@fail_on_stopiteration
 def evaluate(*args: pd.Series) -> pd.Series:
-return pd.Series(result_func(f(*a)) for a in zip(*args))
-
-def verify_result_type(result):
-if not hasattr(result, "__len__"):
-pd_type = "pandas.DataFrame" if type(return_type) == StructType 
else "pandas.Series"
-raise PySparkTypeError(
-error_class="UDF_RETURN_TYPE",
-message_parameters={
-"expected": pd_type,
-"actual": type(result).__name__,
-},
-)
-return result
+return pd.Series([result_func(f(*a)) for a in zip(*args)])
 
 def verify_result_length(result, length):
 if len(result) != length:
@@ -181,7 +170,7 @@ def wrap_arrow_batch_udf(f, return_type):
 return result
 
 return lambda *a: (
-verify_result_length(verify_result_type(evaluate(*a)), len(a[0])),
+verify_result_length(evaluate(*a), len(a[0])),
 arrow_return_type,
 )
 
@@ -543,7 +532,10 @@ def read_single_udf(pickleSer, infile, eval_type,