[spark] branch master updated: [SPARK-43241][PS] `MultiIndex.append` not checking names for equality
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new d7e827e52c45 [SPARK-43241][PS] `MultiIndex.append` not checking names for equality d7e827e52c45 is described below commit d7e827e52c45cc048635a0ba7dafefc4c1c76463 Author: Haejoon Lee AuthorDate: Tue Sep 5 11:45:29 2023 +0800 [SPARK-43241][PS] `MultiIndex.append` not checking names for equality ### What changes were proposed in this pull request? This PR proposes to fix the behavior of `MultiIndex.append` to do not checking names. ### Why are the changes needed? To match the behavior with pandas according to https://github.com/pandas-dev/pandas/pull/48288 ### Does this PR introduce _any_ user-facing change? Yes, the behavior is changed to match with pandas: **Testing data** ```python >>> psmidx1 MultiIndex([('a', 'x', 1), ('b', 'y', 2), ('c', 'z', 3)], names=['x', 'y', 'z']) >>> psmidx2 MultiIndex([('a', 'x', 1), ('b', 'y', 2), ('c', 'z', 3)], names=['p', 'q', 'r']) ``` **Before** ```python >>> psmidx1.append(psmidx2) MultiIndex([('a', 'x', 1), ('b', 'y', 2), ('c', 'z', 3), ('a', 'x', 1), ('b', 'y', 2), ('c', 'z', 3)], names=['x', 'y', 'z']) ``` **After** ```python >>> psmidx1.append(psmidx2) MultiIndex([('a', 'x', 1), ('b', 'y', 2), ('c', 'z', 3), ('a', 'x', 1), ('b', 'y', 2), ('c', 'z', 3)], ) ``` ### How was this patch tested? Fix the existing UTs. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #42787 from itholic/SPARK-43241. Authored-by: Haejoon Lee Signed-off-by: Ruifeng Zheng --- python/pyspark/pandas/indexes/base.py | 8 +-- .../pyspark/pandas/tests/indexes/test_base_slow.py | 26 +++--- 2 files changed, 4 insertions(+), 30 deletions(-) diff --git a/python/pyspark/pandas/indexes/base.py b/python/pyspark/pandas/indexes/base.py index bfde7e554ba5..c7d9f4e57467 100644 --- a/python/pyspark/pandas/indexes/base.py +++ b/python/pyspark/pandas/indexes/base.py @@ -1917,18 +1917,12 @@ class Index(IndexOpsMixin): sdf_other = other._internal.spark_frame.select(other._internal.index_spark_columns) sdf_appended = sdf_self.union(sdf_other) -# names should be kept when MultiIndex, but Index wouldn't keep its name. -if isinstance(self, MultiIndex): -index_names = self._internal.index_names -else: -index_names = None - internal = InternalFrame( spark_frame=sdf_appended, index_spark_columns=[ scol_for(sdf_appended, col) for col in self._internal.index_spark_column_names ], -index_names=index_names, +index_names=None, index_fields=index_fields, ) diff --git a/python/pyspark/pandas/tests/indexes/test_base_slow.py b/python/pyspark/pandas/tests/indexes/test_base_slow.py index b2a9e52bc61a..ca38b2ff9aff 100644 --- a/python/pyspark/pandas/tests/indexes/test_base_slow.py +++ b/python/pyspark/pandas/tests/indexes/test_base_slow.py @@ -107,29 +107,9 @@ class IndexesSlowTestsMixin: psmidx1 = ps.from_pandas(pmidx1) psmidx2 = ps.from_pandas(pmidx2) -# TODO(SPARK-43241): MultiIndex.append not checking names for equality. -# Also refer to https://github.com/pandas-dev/pandas/pull/48288. -if LooseVersion(pd.__version__) >= LooseVersion("2.0.0"): -self.assert_eq( -pmidx1.append(pmidx2), psmidx1.append(psmidx2).rename([None, None, None]) -) -else: -self.assert_eq(pmidx1.append(pmidx2), psmidx1.append(psmidx2)) - -if LooseVersion(pd.__version__) >= LooseVersion("2.0.0"): -self.assert_eq( -pmidx2.append(pmidx1), psmidx2.append(psmidx1).rename([None, None, None]) -) -else: -self.assert_eq(pmidx2.append(pmidx1), psmidx2.append(psmidx1)) - -if LooseVersion(pd.__version__) >= LooseVersion("2.0.0"): -self.assert_eq( -pmidx1.append(pmidx2).names, -psmidx1.append(psmidx2).rename([None, None, None]).names, -) -else: -self.assert_eq(pmidx1.append(pmidx2).names, psmidx1.append(psmidx2).names) +self.assert_eq(pmidx1.append(pmidx2), psmidx1.append(psmidx2)) +self.assert_eq(pmidx2.append(pmidx1),
[spark] branch master updated: [SPARK-45073][PS][CONNECT] Replace `LastNotNull` with `Last(ignoreNulls=True)`
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 4686c2733702 [SPARK-45073][PS][CONNECT] Replace `LastNotNull` with `Last(ignoreNulls=True)` 4686c2733702 is described below commit 4686c27337025dd1a616da73b19abe7ea00a4624 Author: Ruifeng Zheng AuthorDate: Tue Sep 5 11:13:35 2023 +0800 [SPARK-45073][PS][CONNECT] Replace `LastNotNull` with `Last(ignoreNulls=True)` ### What changes were proposed in this pull request? Replace `LastNotNull` with `Last(ignoreNulls=True)` ### Why are the changes needed? https://github.com/apache/spark/pull/36127 introduced a PS dedicated expression `LastNotNull`, which was actually not needed and can be replaced with built-in `Last` ### Does this PR introduce _any_ user-facing change? NO ### How was this patch tested? CI ### Was this patch authored or co-authored using generative AI tooling? NO Closes #42808 from zhengruifeng/del_last_not_none. Authored-by: Ruifeng Zheng Signed-off-by: Ruifeng Zheng --- .../sql/connect/planner/SparkConnectPlanner.scala | 4 --- python/pyspark/pandas/series.py| 2 +- python/pyspark/pandas/spark/functions.py | 14 .../catalyst/expressions/windowExpressions.scala | 37 -- .../spark/sql/api/python/PythonSQLUtils.scala | 2 -- 5 files changed, 1 insertion(+), 58 deletions(-) diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala index 579b378d09f6..1a63c9fc27c6 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala @@ -1905,10 +1905,6 @@ class SparkConnectPlanner(val sessionHolder: SessionHolder) extends Logging { val ignoreNA = extractBoolean(children(2), "ignoreNA") Some(EWM(children(0), alpha, ignoreNA)) - case "last_non_null" if fun.getArgumentsCount == 1 => -val children = fun.getArgumentsList.asScala.map(transformExpression) -Some(LastNonNull(children(0))) - case "null_index" if fun.getArgumentsCount == 1 => val children = fun.getArgumentsList.asScala.map(transformExpression) Some(NullIndex(children(0))) diff --git a/python/pyspark/pandas/series.py b/python/pyspark/pandas/series.py index 7fa08c6d9b24..863e98c42ead 100644 --- a/python/pyspark/pandas/series.py +++ b/python/pyspark/pandas/series.py @@ -2257,7 +2257,7 @@ class Series(Frame, IndexOpsMixin, Generic[T]): return self._psdf.copy()._psser_for(self._column_label) scol = self.spark.column -last_non_null = SF.last_non_null(scol) +last_non_null = F.last(scol, True) null_index = SF.null_index(scol) Window = get_window_class() diff --git a/python/pyspark/pandas/spark/functions.py b/python/pyspark/pandas/spark/functions.py index d6f6c6fdeebc..b0bc6efcd56e 100644 --- a/python/pyspark/pandas/spark/functions.py +++ b/python/pyspark/pandas/spark/functions.py @@ -159,20 +159,6 @@ def ewm(col: Column, alpha: float, ignore_na: bool) -> Column: return Column(sc._jvm.PythonSQLUtils.ewm(col._jc, alpha, ignore_na)) -def last_non_null(col: Column) -> Column: -if is_remote(): -from pyspark.sql.connect.functions import _invoke_function_over_columns - -return _invoke_function_over_columns( # type: ignore[return-value] -"last_non_null", -col, # type: ignore[arg-type] -) - -else: -sc = SparkContext._active_spark_context -return Column(sc._jvm.PythonSQLUtils.lastNonNull(col._jc)) - - def null_index(col: Column) -> Column: if is_remote(): from pyspark.sql.connect.functions import _invoke_function_over_columns diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala index 50c98c01645d..bc61170f567f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala @@ -1152,43 +1152,6 @@ case class EWM(input: Expression, alpha: Double, ignoreNA: Boolean) } -/** - * Keep the last non-null value seen if any. This expression is dedicated only for - * Pandas API on Spark. - * For example, - * Input: null, 1, 2, 3, null, 4, 5, null - * Output: null, 1, 2, 3, 3, 4, 5, 5 - */ -case class
[spark] branch master updated: [SPARK-45049][CONNECT][DOCS][TESTS] Refine docstrings of `coalesce/repartition/repartitionByRange`
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 66fb2250224f [SPARK-45049][CONNECT][DOCS][TESTS] Refine docstrings of `coalesce/repartition/repartitionByRange` 66fb2250224f is described below commit 66fb2250224ff9ffb71bf2b320ec05d1b33145c2 Author: Ruifeng Zheng AuthorDate: Tue Sep 5 11:09:31 2023 +0900 [SPARK-45049][CONNECT][DOCS][TESTS] Refine docstrings of `coalesce/repartition/repartitionByRange` ### What changes were proposed in this pull request? Enable doctests for `coalesce/repartition/repartitionByRange`, by using `explain` instead of `rdd.getNumPartitions` ### Why are the changes needed? test coverage ### Does this PR introduce _any_ user-facing change? yes ### How was this patch tested? updated doctests ### Was this patch authored or co-authored using generative AI tooling? NO Closes #42770 from zhengruifeng/enable_doctest_partition. Authored-by: Ruifeng Zheng Signed-off-by: Hyukjin Kwon --- python/pyspark/sql/connect/dataframe.py | 4 -- python/pyspark/sql/dataframe.py | 117 +++- 2 files changed, 99 insertions(+), 22 deletions(-) diff --git a/python/pyspark/sql/connect/dataframe.py b/python/pyspark/sql/connect/dataframe.py index 86a635361858..b22fdc1383cf 100644 --- a/python/pyspark/sql/connect/dataframe.py +++ b/python/pyspark/sql/connect/dataframe.py @@ -2191,10 +2191,6 @@ def _test() -> None: os.chdir(os.environ["SPARK_HOME"]) globs = pyspark.sql.connect.dataframe.__dict__.copy() -# Spark Connect does not support RDD but the tests depend on them. -del pyspark.sql.connect.dataframe.DataFrame.coalesce.__doc__ -del pyspark.sql.connect.dataframe.DataFrame.repartition.__doc__ -del pyspark.sql.connect.dataframe.DataFrame.repartitionByRange.__doc__ # TODO(SPARK-41625): Support Structured Streaming del pyspark.sql.connect.dataframe.DataFrame.isStreaming.__doc__ diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 4b8bdd1c2779..3d7bdd7a0b2b 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -1763,9 +1763,27 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin): Examples ->>> df = spark.range(10) ->>> df.coalesce(1).rdd.getNumPartitions() -1 +>>> from pyspark.sql import functions as sf +>>> spark.range(0, 10, 1, 3).select( +... sf.spark_partition_id().alias("partition") +... ).distinct().sort("partition").show() ++-+ +|partition| ++-+ +|0| +|1| +|2| ++-+ + +>>> from pyspark.sql import functions as sf +>>> spark.range(0, 10, 1, 3).coalesce(1).select( +... sf.spark_partition_id().alias("partition") +... ).distinct().sort("partition").show() ++-+ +|partition| ++-+ +|0| ++-+ """ return DataFrame(self._jdf.coalesce(numPartitions), self.sparkSession) @@ -1809,23 +1827,78 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin): Examples ->>> df = spark.createDataFrame( -... [(14, "Tom"), (23, "Alice"), (16, "Bob")], ["age", "name"]) +>>> from pyspark.sql import functions as sf +>>> df = spark.range(0, 64, 1, 9).withColumn( +... "name", sf.concat(sf.lit("name_"), sf.col("id").cast("string")) +... ).withColumn( +... "age", sf.col("id") - 32 +... ) +>>> df.select( +... sf.spark_partition_id().alias("partition") +... ).distinct().sort("partition").show() ++-+ +|partition| ++-+ +|0| +|1| +|2| +|3| +|4| +|5| +|6| +|7| +|8| ++-+ Repartition the data into 10 partitions. ->>> df.repartition(10).rdd.getNumPartitions() -10 +>>> df.repartition(10).select( +... sf.spark_partition_id().alias("partition") +... ).distinct().sort("partition").show() ++-+ +|partition| ++-+ +|0| +|1| +|2| +|3| +|4| +|5| +|6| +|7| +|8| +|9| ++-+ Repartition the data into 7 partitions by 'age' column. ->>> df.repartition(7, "age").rdd.getNumPartitions() -
[spark] branch master updated: [SPARK-42619][PS] Add `show_counts` parameter for DataFrame.info
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 8d358cdbd57 [SPARK-42619][PS] Add `show_counts` parameter for DataFrame.info 8d358cdbd57 is described below commit 8d358cdbd57e69a16c914f329d3e4173ceb7b1ef Author: zhyhimont AuthorDate: Tue Sep 5 08:45:57 2023 +0800 [SPARK-42619][PS] Add `show_counts` parameter for DataFrame.info ### What changes were proposed in this pull request? Added `show_counts` parameter for DataFrame.info ### Why are the changes needed? When pandas 2.0.0 is released, we should match the behavior in pandas API on Spark. ### Does this PR introduce _any_ user-facing change? Changed the name of the parameter `null_counts` to `show_counts` of the method DataFrame.info ### How was this patch tested? UT Closes #40436 from dzhigimont/SPARK-42619_ZH. Lead-authored-by: zhyhimont Co-authored-by: Zhyhimont Dmitry Signed-off-by: Ruifeng Zheng --- .../source/migration_guide/pyspark_upgrade.rst | 1 + python/pyspark/pandas/frame.py | 7 +++--- python/pyspark/pandas/indexes/base.py | 2 +- python/pyspark/pandas/tests/io/test_io.py | 28 ++ 4 files changed, 34 insertions(+), 4 deletions(-) diff --git a/python/docs/source/migration_guide/pyspark_upgrade.rst b/python/docs/source/migration_guide/pyspark_upgrade.rst index 9ec38ad2709..8b3058ba547 100644 --- a/python/docs/source/migration_guide/pyspark_upgrade.rst +++ b/python/docs/source/migration_guide/pyspark_upgrade.rst @@ -39,6 +39,7 @@ Upgrading from PySpark 3.5 to 4.0 * In Spark 4.0, the default value of ``regex`` parameter for ``Series.str.replace`` has been changed from ``True`` to ``False`` from pandas API on Spark. Additionally, a single character ``pat`` with ``regex=True`` is now treated as a regular expression instead of a string literal. * In Spark 4.0, the resulting name from ``value_counts`` for all objects sets to ``'count'`` (or ``'proportion'`` if ``normalize=True`` was passed) from pandas API on Spark, and the index will be named after the original object. * In Spark 4.0, ``squeeze`` parameter from ``ps.read_csv`` and ``ps.read_excel`` has been removed from pandas API on Spark. +* In Spark 4.0, ``null_counts`` parameter from ``DataFrame.info`` has been removed from pandas API on Spark, use ``show_counts`` instead. Upgrading from PySpark 3.3 to 3.4 diff --git a/python/pyspark/pandas/frame.py b/python/pyspark/pandas/frame.py index 41ab03a5c0b..adbef607256 100644 --- a/python/pyspark/pandas/frame.py +++ b/python/pyspark/pandas/frame.py @@ -11948,12 +11948,12 @@ defaultdict(, {'col..., 'col...})] return cast(ps.Series, ps.from_pandas(psdf._to_internal_pandas().idxmin())) -# TODO(SPARK-41619): Add `show_counts` parameter and replace with `null_counts`. def info( self, verbose: Optional[bool] = None, buf: Optional[IO[str]] = None, max_cols: Optional[int] = None, +show_counts: Optional[bool] = None, ) -> None: """ Print a concise summary of a DataFrame. @@ -11973,10 +11973,10 @@ defaultdict(, {'col..., 'col...})] When to switch from the verbose to the truncated output. If the DataFrame has more than `max_cols` columns, the truncated output is used. -null_counts : bool, optional +show_counts : bool, optional Whether to show the non-null counts. -.. deprecated:: 3.4.0 +.. versionadded:: 4.0.0 Returns --- @@ -12066,6 +12066,7 @@ defaultdict(, {'col..., 'col...})] buf=buf, max_cols=max_cols, memory_usage=False, +show_counts=show_counts, # type: ignore ) finally: del self._data diff --git a/python/pyspark/pandas/indexes/base.py b/python/pyspark/pandas/indexes/base.py index 4c2ab137435..bfde7e554ba 100644 --- a/python/pyspark/pandas/indexes/base.py +++ b/python/pyspark/pandas/indexes/base.py @@ -289,7 +289,7 @@ class Index(IndexOpsMixin): if name is None: name = type(self).__name__ -return "%s: %s entries%s" % (name, total_count, index_summary) +return "%s: %s entries%s" % (name, int(total_count), index_summary) @property def size(self) -> int: diff --git a/python/pyspark/pandas/tests/io/test_io.py b/python/pyspark/pandas/tests/io/test_io.py index 4eadc6a7eb5..59812ae3d5a 100644 --- a/python/pyspark/pandas/tests/io/test_io.py +++ b/python/pyspark/pandas/tests/io/test_io.py @@ -16,6 +16,7 @@ # from distutils.version import LooseVersion import unittest +from io
[spark] branch master updated (d17a8613a68 -> ebdbc7b1d3e)
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from d17a8613a68 [SPARK-45047][PYTHON][CONNECT] `DataFrame.groupBy` support ordinals add ebdbc7b1d3e [SPARK-45064][PYTHON][CONNECT] Add the missing `scale` parameter in `ceil/ceiling` No new revisions were added by this update. Summary of changes: python/pyspark/sql/connect/functions.py | 16 ++--- python/pyspark/sql/functions.py | 58 + 2 files changed, 57 insertions(+), 17 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-45047][PYTHON][CONNECT] `DataFrame.groupBy` support ordinals
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new d17a8613a68 [SPARK-45047][PYTHON][CONNECT] `DataFrame.groupBy` support ordinals d17a8613a68 is described below commit d17a8613a68af076bc796881831382c29df4d90e Author: Ruifeng Zheng AuthorDate: Mon Sep 4 15:23:08 2023 -0700 [SPARK-45047][PYTHON][CONNECT] `DataFrame.groupBy` support ordinals ### What changes were proposed in this pull request? make `DataFrame.groupBy` accept ordinals ### Why are the changes needed? for feature parity ``` select target_country, ua_date, sum(spending_usd) from df group by 2, 1 order by 2, 3 desc ``` this PR focus on the `groupBy` method ### Does this PR introduce _any_ user-facing change? yes, new feature ``` In [2]: from pyspark.sql import functions as sf In [3]: df = spark.createDataFrame([(1, 1), (1, 2), (2, 1), (2, 2), (3, 1), (3, 2)], ["a", "b"]) In [4]: df.select("a", sf.lit(1), "b").groupBy("a", 2).agg(sf.sum("b")).show() +---+---+--+ | a| 1|sum(b)| +---+---+--+ | 1| 1| 3| | 2| 1| 3| | 3| 1| 3| +---+---+--+ ``` ### How was this patch tested? added ut ### Was this patch authored or co-authored using generative AI tooling? no Closes #42767 from zhengruifeng/py_groupby_index. Authored-by: Ruifeng Zheng Signed-off-by: Dongjoon Hyun --- python/pyspark/sql/_typing.pyi | 1 + python/pyspark/sql/connect/_typing.py | 2 + python/pyspark/sql/connect/dataframe.py| 9 ++- python/pyspark/sql/dataframe.py| 66 -- python/pyspark/sql/tests/test_group.py | 61 python/pyspark/sql/tests/typing/test_dataframe.yml | 2 +- 6 files changed, 133 insertions(+), 8 deletions(-) diff --git a/python/pyspark/sql/_typing.pyi b/python/pyspark/sql/_typing.pyi index 3d095f55709..cee44c4aa06 100644 --- a/python/pyspark/sql/_typing.pyi +++ b/python/pyspark/sql/_typing.pyi @@ -36,6 +36,7 @@ from pyspark.sql.column import Column ColumnOrName = Union[Column, str] ColumnOrName_ = TypeVar("ColumnOrName_", bound=ColumnOrName) +ColumnOrNameOrOrdinal = Union[Column, str, int] DecimalLiteral = decimal.Decimal DateTimeLiteral = Union[datetime.datetime, datetime.date] LiteralType = PrimitiveType diff --git a/python/pyspark/sql/connect/_typing.py b/python/pyspark/sql/connect/_typing.py index 4c76e37659c..471af24f40d 100644 --- a/python/pyspark/sql/connect/_typing.py +++ b/python/pyspark/sql/connect/_typing.py @@ -37,6 +37,8 @@ from pyspark.sql.streaming.state import GroupState ColumnOrName = Union[Column, str] +ColumnOrNameOrOrdinal = Union[Column, str, int] + PrimitiveType = Union[bool, float, int, str] OptionalPrimitiveType = Optional[PrimitiveType] diff --git a/python/pyspark/sql/connect/dataframe.py b/python/pyspark/sql/connect/dataframe.py index c42de589f8d..86a63536185 100644 --- a/python/pyspark/sql/connect/dataframe.py +++ b/python/pyspark/sql/connect/dataframe.py @@ -85,6 +85,7 @@ from pyspark.sql.pandas.types import from_arrow_schema if TYPE_CHECKING: from pyspark.sql.connect._typing import ( ColumnOrName, +ColumnOrNameOrOrdinal, LiteralType, PrimitiveType, OptionalPrimitiveType, @@ -476,7 +477,7 @@ class DataFrame: first.__doc__ = PySparkDataFrame.first.__doc__ -def groupBy(self, *cols: "ColumnOrName") -> GroupedData: +def groupBy(self, *cols: "ColumnOrNameOrOrdinal") -> GroupedData: if len(cols) == 1 and isinstance(cols[0], list): cols = cols[0] @@ -486,6 +487,12 @@ class DataFrame: _cols.append(c) elif isinstance(c, str): _cols.append(self[c]) +elif isinstance(c, int) and not isinstance(c, bool): +# TODO: should introduce dedicated error class +if c < 1: +raise IndexError(f"Column ordinal must be positive but got {c}") +# ordinal is 1-based +_cols.append(self[c - 1]) else: raise PySparkTypeError( error_class="NOT_COLUMN_OR_STR", diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 64592311a13..4b8bdd1c277 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -67,7 +67,12 @@ from pyspark.sql.pandas.map_ops import PandasMapOpsMixin if TYPE_CHECKING: from pyspark._typing import PrimitiveType from pyspark.pandas.frame import DataFrame as PandasOnSparkDataFrame -from pyspark.sql._typing import ColumnOrName, LiteralType,
[spark] branch master updated: [SPARK-45059][CONNECT][FOLLOWUP] Remove `try_reflect` problem filter rule in connect
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 96e1e86b0a8 [SPARK-45059][CONNECT][FOLLOWUP] Remove `try_reflect` problem filter rule in connect 96e1e86b0a8 is described below commit 96e1e86b0a8f296845d697b0019d7ed16864c938 Author: Jia Fan AuthorDate: Mon Sep 4 15:14:55 2023 -0700 [SPARK-45059][CONNECT][FOLLOWUP] Remove `try_reflect` problem filter rule in connect ### What changes were proposed in this pull request? This is a followup PR for #42783 , to remove `try_reflect` problem filter rule in spark connect. ### Why are the changes needed? make sure the `try_reflect` check work. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? exist test. ### Was this patch authored or co-authored using generative AI tooling? No Closes #42800 from Hisoka-X/SPARK-45059_remove_connect_check. Authored-by: Jia Fan Signed-off-by: Dongjoon Hyun --- .../spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala index bf512ed71fd..1e536cd37fe 100644 --- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala @@ -208,8 +208,6 @@ object CheckConnectJvmClientCompatibility { // functions ProblemFilters.exclude[Problem]("org.apache.spark.sql.functions.unwrap_udt"), ProblemFilters.exclude[Problem]("org.apache.spark.sql.functions.udaf"), - ProblemFilters.exclude[DirectMissingMethodProblem]( -"org.apache.spark.sql.functions.try_reflect"), // KeyValueGroupedDataset ProblemFilters.exclude[Problem]( - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-45066][SQL][PYTHON][CONNECT] Make function `repeat` accept column-type `n`
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new ba20eaa4c30 [SPARK-45066][SQL][PYTHON][CONNECT] Make function `repeat` accept column-type `n` ba20eaa4c30 is described below commit ba20eaa4c30aecb32ba2deb7bbf502bec929a297 Author: Ruifeng Zheng AuthorDate: Mon Sep 4 15:04:25 2023 -0700 [SPARK-45066][SQL][PYTHON][CONNECT] Make function `repeat` accept column-type `n` ### What changes were proposed in this pull request? Make function `repeat` accept column-type `n` ### Why are the changes needed? 1. to follow this guide: https://github.com/apache/spark/blob/5b609598503df603cbddd5e1adf8d2cb28a5f977/sql/core/src/main/scala/org/apache/spark/sql/functions.scala#L60-L62 2. especially, can replace [the internal function](https://github.com/apache/spark/blob/17fac569b4e4b569d41f761db07d7bf112801e0c/python/pyspark/pandas/spark/functions.py#L138-L143) in Pandas API (to make the PR clean, I will replace it in separate PR) ### Does this PR introduce _any_ user-facing change? yes ### How was this patch tested? NO ### Was this patch authored or co-authored using generative AI tooling? NO Closes #42794 from zhengruifeng/func_repeat_func. Authored-by: Ruifeng Zheng Signed-off-by: Dongjoon Hyun --- .../scala/org/apache/spark/sql/functions.scala | 8 + python/pyspark/sql/connect/functions.py| 5 +-- python/pyspark/sql/functions.py| 42 ++ .../scala/org/apache/spark/sql/functions.scala | 10 ++ 4 files changed, 57 insertions(+), 8 deletions(-) diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala index 9ead800ace7..527848e95e6 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala @@ -4100,6 +4100,14 @@ object functions { */ def repeat(str: Column, n: Int): Column = Column.fn("repeat", str, lit(n)) + /** + * Repeats a string column n times, and returns it as a new string column. + * + * @group string_funcs + * @since 4.0.0 + */ + def repeat(str: Column, n: Column): Column = Column.fn("repeat", str, n) + /** * Trim the spaces from right end for the specified string value. * diff --git a/python/pyspark/sql/connect/functions.py b/python/pyspark/sql/connect/functions.py index f290549ae47..19dd021ba08 100644 --- a/python/pyspark/sql/connect/functions.py +++ b/python/pyspark/sql/connect/functions.py @@ -2357,8 +2357,9 @@ def rpad(col: "ColumnOrName", len: int, pad: str) -> Column: rpad.__doc__ = pysparkfuncs.rpad.__doc__ -def repeat(col: "ColumnOrName", n: int) -> Column: -return _invoke_function("repeat", _to_col(col), lit(n)) +def repeat(col: "ColumnOrName", n: Union["ColumnOrName", int]) -> Column: +n = lit(n) if isinstance(n, int) else n +return _invoke_function("repeat", _to_col(col), _to_col(n)) repeat.__doc__ = pysparkfuncs.repeat.__doc__ diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index 6e0caf50c16..699c8b9c8cf 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -10020,7 +10020,7 @@ def rpad(col: "ColumnOrName", len: int, pad: str) -> Column: @try_remote_functions -def repeat(col: "ColumnOrName", n: int) -> Column: +def repeat(col: "ColumnOrName", n: Union["ColumnOrName", int]) -> Column: """ Repeats a string column n times, and returns it as a new string column. @@ -10033,9 +10033,12 @@ def repeat(col: "ColumnOrName", n: int) -> Column: -- col : :class:`~pyspark.sql.Column` or str target column to work on. -n : int +n : :class:`~pyspark.sql.Column` or str or int number of times to repeat value. +.. versionchanged:: 4.0.0 + `n` now accepts column and column name. + Returns --- :class:`~pyspark.sql.Column` @@ -10043,11 +10046,38 @@ def repeat(col: "ColumnOrName", n: int) -> Column: Examples ->>> df = spark.createDataFrame([('ab',)], ['s',]) ->>> df.select(repeat(df.s, 3).alias('s')).collect() -[Row(s='ababab')] +>>> import pyspark.sql.functions as sf +>>> spark.createDataFrame( +... [('ab',)], ['s',] +... ).select(sf.repeat("s", 3)).show() +++ +|repeat(s, 3)| +++ +| ababab| +++ + +>>> import pyspark.sql.functions as sf +>>> spark.createDataFrame( +... [('ab',)], ['s',] +... ).select(sf.repeat("s", sf.lit(4))).show() +++ +
[spark] branch master updated: [SPARK-45063][PYTHON][DOCS] Refine docstring of `max_by/min_by`
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 798fce3b571 [SPARK-45063][PYTHON][DOCS] Refine docstring of `max_by/min_by` 798fce3b571 is described below commit 798fce3b571907ee52058004cc38c2e8dbc4b016 Author: yangjie01 AuthorDate: Mon Sep 4 14:48:14 2023 -0700 [SPARK-45063][PYTHON][DOCS] Refine docstring of `max_by/min_by` ### What changes were proposed in this pull request? This pr refine docstring of `max_by/min_by` and add some new examples. ### Why are the changes needed? To improve PySpark documentation ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Pass Github Actions ### Was this patch authored or co-authored using generative AI tooling? No Closes #42789 from LuciferYang/SPARK-45063. Authored-by: yangjie01 Signed-off-by: Dongjoon Hyun --- python/pyspark/sql/functions.py | 96 - 1 file changed, 86 insertions(+), 10 deletions(-) diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index d025b13cd10..6e0caf50c16 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -1346,7 +1346,9 @@ def min(col: "ColumnOrName") -> Column: @try_remote_functions def max_by(col: "ColumnOrName", ord: "ColumnOrName") -> Column: """ -Returns the value associated with the maximum value of ord. +Returns the value from the `col` parameter that is associated with the maximum value +from the `ord` parameter. This function is often used to find the `col` parameter value +corresponding to the maximum `ord` parameter value within each group when used with groupBy(). .. versionadded:: 3.3.0 @@ -1356,28 +1358,64 @@ def max_by(col: "ColumnOrName", ord: "ColumnOrName") -> Column: Parameters -- col : :class:`~pyspark.sql.Column` or str -target column to compute on. +The column representing the values to be returned. This could be the column instance +or the column name as string. ord : :class:`~pyspark.sql.Column` or str -column to be maximized +The column that needs to be maximized. This could be the column instance +or the column name as string. Returns --- :class:`~pyspark.sql.Column` -value associated with the maximum value of ord. +A column object representing the value from `col` that is associated with +the maximum value from `ord`. Examples +Example 1: Using `max_by` with groupBy + +>>> import pyspark.sql.functions as sf >>> df = spark.createDataFrame([ ... ("Java", 2012, 2), ("dotNET", 2012, 5000), ... ("dotNET", 2013, 48000), ("Java", 2013, 3)], ... schema=("course", "year", "earnings")) ->>> df.groupby("course").agg(max_by("year", "earnings")).show() +>>> df.groupby("course").agg(sf.max_by("year", "earnings")).show() +--+--+ |course|max_by(year, earnings)| +--+--+ | Java| 2013| |dotNET| 2013| +--+--+ + +Example 2: Using `max_by` with different data types + +>>> import pyspark.sql.functions as sf +>>> df = spark.createDataFrame([ +... ("Marketing", "Anna", 4), ("IT", "Bob", 2), +... ("IT", "Charlie", 3), ("Marketing", "David", 1)], +... schema=("department", "name", "years_in_dept")) +>>> df.groupby("department").agg(sf.max_by("name", "years_in_dept")).show() ++--+---+ +|department|max_by(name, years_in_dept)| ++--+---+ +|IT|Charlie| +| Marketing| Anna| ++--+---+ + +Example 3: Using `max_by` where `ord` has multiple maximum values + +>>> import pyspark.sql.functions as sf +>>> df = spark.createDataFrame([ +... ("Consult", "Eva", 6), ("Finance", "Frank", 5), +... ("Finance", "George", 5), ("Consult", "Henry", 7)], +... schema=("department", "name", "years_in_dept")) +>>> df.groupby("department").agg(sf.max_by("name", "years_in_dept")).show() ++--+---+ +|department|max_by(name, years_in_dept)| ++--+---+ +| Consult| Henry| +| Finance| George| ++--+---+ """ return _invoke_function_over_columns("max_by", col, ord) @@ -1385,7 +1423,9 @@ def max_by(col: "ColumnOrName", ord: "ColumnOrName") -> Column:
[spark] branch master updated: [SPARK-45031][INFRA] Choose the right merge code path and merge hash for reopened PRs
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 792b37c1ab4 [SPARK-45031][INFRA] Choose the right merge code path and merge hash for reopened PRs 792b37c1ab4 is described below commit 792b37c1ab4658b81f2e5f06d28e438af53988fb Author: Kent Yao AuthorDate: Mon Sep 4 14:45:45 2023 -0700 [SPARK-45031][INFRA] Choose the right merge code path and merge hash for reopened PRs ### What changes were proposed in this pull request? When determining to cherry-pick a PR, we also check the PR is in the closed state; Otherwise, we assume it gets reverted and reopened, and we go normal merging. When cherry-picking, we shall select the merge hash from the latest commit-close event instead of the oldest. This is a bug fix ### Why are the changes needed? Bugfix ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? mutually ### Was this patch authored or co-authored using generative AI tooling? no Closes #42749 from yaooqinn/SPARK-45031. Authored-by: Kent Yao Signed-off-by: Dongjoon Hyun --- dev/merge_spark_pr.py | 9 +++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/dev/merge_spark_pr.py b/dev/merge_spark_pr.py index 23f5af7daca..4021999f19b 100755 --- a/dev/merge_spark_pr.py +++ b/dev/merge_spark_pr.py @@ -615,8 +615,13 @@ def main(): # Instead, they're closed by committers. merge_commits = [e for e in pr_events if e["event"] == "closed" and e["commit_id"] is not None] -if merge_commits: -merge_hash = merge_commits[0]["commit_id"] +if merge_commits and pr["state"] == "closed": +# A PR might have multiple merge commits, if it's reopened and merged again. We shall +# cherry-pick PRs in closed state with the latest merge hash. +# If the PR is still open(reopened), we shall not cherry-pick it but perform the normal +# merge as it could have been reverted earlier. +merge_commits = sorted(merge_commits, key=lambda x: x["created_at"]) +merge_hash = merge_commits[-1]["commit_id"] message = get_json("%s/commits/%s" % (GITHUB_API_BASE, merge_hash))["commit"]["message"] print("Pull request %s has already been merged, assuming you want to backport" % pr_num) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.4 updated: [SPARK-44940][SQL][3.4] Improve performance of JSON parsing when "spark.sql.json.enablePartialResults" is enabled
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.4 by this push: new daf481d9505 [SPARK-44940][SQL][3.4] Improve performance of JSON parsing when "spark.sql.json.enablePartialResults" is enabled daf481d9505 is described below commit daf481d950564efc01fb99628dded08ad1f51ff2 Author: Ivan Sadikov AuthorDate: Mon Sep 4 14:39:06 2023 -0700 [SPARK-44940][SQL][3.4] Improve performance of JSON parsing when "spark.sql.json.enablePartialResults" is enabled ### What changes were proposed in this pull request? Backport of https://github.com/apache/spark/pull/42667 to branch-3.4. The PR improves JSON parsing when `spark.sql.json.enablePartialResults` is enabled: - Fixes the issue when using nested arrays `ClassCastException: org.apache.spark.sql.catalyst.util.GenericArrayData cannot be cast to org.apache.spark.sql.catalyst.InternalRow` - Improves parsing of the nested struct fields, e.g. `{"a1": "AAA", "a2": [{"f1": "", "f2": ""}], "a3": "id1", "a4": "XXX"}` used to be parsed as `|AAA|NULL|NULL|NULL|` and now is parsed as `|AAA|[{NULL, }]|id1|XXX|`. - Improves performance of nested JSON parsing. The initial implementation would throw too many exceptions when multiple nested fields failed to parse. When the config is disabled, it is not a problem because the entire record is marked as NULL. The internal benchmarks show the performance improvement from slowdown of over 160% to an improvement of 7-8% compared to the master branch when the flag is enabled. I will create a follow-up ticket to add a benchmark for this regression. ### Why are the changes needed? Fixes some corner cases in JSON parsing and improves performance when `spark.sql.json.enablePartialResults` is enabled. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? I added tests to verify nested structs, maps, and arrays can be parsed without affecting the subsequent fields in the JSON. I also updated the existing tests when `spark.sql.json.enablePartialResults` is enabled because we parse more data now. I added a benchmark to check performance. Before the change (master, https://github.com/apache/spark/commit/a45a3a3d60cb97b107a177ad16bfe36372bc3e9b): ``` [info] OpenJDK 64-Bit Server VM 1.8.0_292-8u292-b10-0ubuntu1~18.04-b10 on Linux 5.4.0-1045-aws [info] Intel(R) Xeon(R) Platinum 8375C CPU 2.90GHz [info] Partial JSON results: Best Time(ms) Avg Time(ms) Stdev(ms)Rate(M/s) Per Row(ns) Relative [info] [info] parse invalid JSON 9537 9820 452 0.0 953651.6 1.0X ``` After the change (this PR): ``` OpenJDK 64-Bit Server VM 1.8.0_292-8u292-b10-0ubuntu1~18.04-b10 on Linux 5.4.0-1045-aws Intel(R) Xeon(R) Platinum 8375C CPU 2.90GHz Partial JSON results: Best Time(ms) Avg Time(ms) Stdev(ms)Rate(M/s) Per Row(ns) Relative parse invalid JSON 3100 3106 6 0.0 309967.6 1.0X ``` ### Was this patch authored or co-authored using generative AI tooling? No. Closes #42792 from sadikovi/SPARK-44940-3.4. Authored-by: Ivan Sadikov Signed-off-by: Dongjoon Hyun --- .../spark/sql/catalyst/json/JacksonParser.scala| 41 - .../sql/catalyst/util/BadRecordException.scala | 69 - .../spark/sql/errors/QueryExecutionErrors.scala| 12 +- sql/core/benchmarks/JsonBenchmark-results.txt | 153 ++- .../org/apache/spark/sql/JsonFunctionsSuite.scala | 20 ++- .../execution/datasources/json/JsonBenchmark.scala | 28 .../sql/execution/datasources/json/JsonSuite.scala | 170 - 7 files changed, 400 insertions(+), 93 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala index d9bff3dc7ec..20b281332d0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala @@ -420,17 +420,17 @@ class JacksonParser( case VALUE_STRING if parser.getTextLength < 1 && allowEmptyString => dataType match { case FloatType | DoubleType |
[spark] branch branch-3.5 updated: [SPARK-44940][SQL][3.5] Improve performance of JSON parsing when "spark.sql.json.enablePartialResults" is enabled
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.5 by this push: new 0dea7db3660 [SPARK-44940][SQL][3.5] Improve performance of JSON parsing when "spark.sql.json.enablePartialResults" is enabled 0dea7db3660 is described below commit 0dea7db3660b9db7bbe075c31712e7119bfd1af7 Author: Ivan Sadikov AuthorDate: Mon Sep 4 14:37:32 2023 -0700 [SPARK-44940][SQL][3.5] Improve performance of JSON parsing when "spark.sql.json.enablePartialResults" is enabled ### What changes were proposed in this pull request? Backport of https://github.com/apache/spark/pull/42667 to branch-3.5. The PR improves JSON parsing when `spark.sql.json.enablePartialResults` is enabled: - Fixes the issue when using nested arrays `ClassCastException: org.apache.spark.sql.catalyst.util.GenericArrayData cannot be cast to org.apache.spark.sql.catalyst.InternalRow` - Improves parsing of the nested struct fields, e.g. `{"a1": "AAA", "a2": [{"f1": "", "f2": ""}], "a3": "id1", "a4": "XXX"}` used to be parsed as `|AAA|NULL|NULL|NULL|` and now is parsed as `|AAA|[{NULL, }]|id1|XXX|`. - Improves performance of nested JSON parsing. The initial implementation would throw too many exceptions when multiple nested fields failed to parse. When the config is disabled, it is not a problem because the entire record is marked as NULL. The internal benchmarks show the performance improvement from slowdown of over 160% to an improvement of 7-8% compared to the master branch when the flag is enabled. I will create a follow-up ticket to add a benchmark for this regression. ### Why are the changes needed? Fixes some corner cases in JSON parsing and improves performance when `spark.sql.json.enablePartialResults` is enabled. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? I added tests to verify nested structs, maps, and arrays can be parsed without affecting the subsequent fields in the JSON. I also updated the existing tests when `spark.sql.json.enablePartialResults` is enabled because we parse more data now. I added a benchmark to check performance. Before the change (master, https://github.com/apache/spark/commit/a45a3a3d60cb97b107a177ad16bfe36372bc3e9b): ``` [info] OpenJDK 64-Bit Server VM 1.8.0_292-8u292-b10-0ubuntu1~18.04-b10 on Linux 5.4.0-1045-aws [info] Intel(R) Xeon(R) Platinum 8375C CPU 2.90GHz [info] Partial JSON results: Best Time(ms) Avg Time(ms) Stdev(ms)Rate(M/s) Per Row(ns) Relative [info] [info] parse invalid JSON 9537 9820 452 0.0 953651.6 1.0X ``` After the change (this PR): ``` OpenJDK 64-Bit Server VM 1.8.0_292-8u292-b10-0ubuntu1~18.04-b10 on Linux 5.4.0-1045-aws Intel(R) Xeon(R) Platinum 8375C CPU 2.90GHz Partial JSON results: Best Time(ms) Avg Time(ms) Stdev(ms)Rate(M/s) Per Row(ns) Relative parse invalid JSON 3100 3106 6 0.0 309967.6 1.0X ``` ### Was this patch authored or co-authored using generative AI tooling? No. Closes #42790 from sadikovi/SPARK-44940-3.5. Authored-by: Ivan Sadikov Signed-off-by: Dongjoon Hyun --- .../spark/sql/catalyst/json/JacksonParser.scala| 41 - .../sql/catalyst/util/BadRecordException.scala | 64 +++- .../spark/sql/errors/QueryExecutionErrors.scala| 12 +- sql/core/benchmarks/JsonBenchmark-results.txt | 152 +- .../org/apache/spark/sql/JsonFunctionsSuite.scala | 20 ++- .../execution/datasources/json/JsonBenchmark.scala | 28 .../sql/execution/datasources/json/JsonSuite.scala | 170 - 7 files changed, 393 insertions(+), 94 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala index 388edb9024c..f14f70532e6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala @@ -420,17 +420,17 @@ class JacksonParser( case VALUE_STRING if parser.getTextLength < 1 && allowEmptyString => dataType match { case FloatType | DoubleType |
[spark] branch master updated: [SPARK-45036][SQL] SPJ: Simplify the logic to handle partially clustered distribution
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 9e2aafb1373 [SPARK-45036][SQL] SPJ: Simplify the logic to handle partially clustered distribution 9e2aafb1373 is described below commit 9e2aafb13739f9c07f8218cd325c5532063b1a51 Author: Chao Sun AuthorDate: Mon Sep 4 14:05:14 2023 -0700 [SPARK-45036][SQL] SPJ: Simplify the logic to handle partially clustered distribution ### What changes were proposed in this pull request? In SPJ, currently the logic to handle partially clustered distribution is a bit complicated. For instance, when the feature is eanbled (by enabling both `conf.v2BucketingPushPartValuesEnabled` and `conf.v2BucketingPartiallyClusteredDistributionEnabled`), Spark should postpone the combining of input splits until it is about to create an input RDD in `BatchScanExec`. To implement this, `groupPartitions` in `DataSourceV2ScanExecBase` currently takes the flag as input and has two differen [...] This PR introduces a new field in `KeyGroupedPartitioning`, named `originalPartitionValues`, that is used to store the original partition values from input before splits combining has been applied. The field is used when partially clustered distribution is enabled. With this, `groupPartitions` becomes easier to understand. In addition, this also simplifies `BatchScanExec.inputRDD` by combining two branches where partially clustered distribution is not enabled. ### Why are the changes needed? To simplify the current logic in the SPJ w.r.t partially clustered distribution. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing tests. ### Was this patch authored or co-authored using generative AI tooling? Closes #42757 from sunchao/SPARK-45036. Authored-by: Chao Sun Signed-off-by: Dongjoon Hyun --- .../sql/catalyst/plans/physical/partitioning.scala | 35 +++--- .../execution/datasources/v2/BatchScanExec.scala | 117 + .../datasources/v2/DataSourceV2ScanExecBase.scala | 65 +++- .../execution/exchange/EnsureRequirements.scala| 9 +- .../execution/exchange/ShuffleExchangeExec.scala | 4 +- .../DistributionAndOrderingSuiteBase.scala | 6 +- .../connector/KeyGroupedPartitioningSuite.scala| 2 +- .../exchange/EnsureRequirementsSuite.scala | 2 +- 8 files changed, 122 insertions(+), 118 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala index ce557422a08..0be4a61f275 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala @@ -312,26 +312,37 @@ case class HashPartitioning(expressions: Seq[Expression], numPartitions: Int) * Represents a partitioning where rows are split across partitions based on transforms defined * by `expressions`. `partitionValues`, if defined, should contain value of partition key(s) in * ascending order, after evaluated by the transforms in `expressions`, for each input partition. - * In addition, its length must be the same as the number of input partitions (and thus is a 1-1 - * mapping). The `partitionValues` may contain duplicated partition values. + * In addition, its length must be the same as the number of Spark partitions (and thus is a 1-1 + * mapping), and each row in `partitionValues` must be unique. * - * For example, if `expressions` is `[years(ts_col)]`, then a valid value of `partitionValues` is - * `[0, 1, 2]`, which represents 3 input partitions with distinct partition values. All rows - * in each partition have the same value for column `ts_col` (which is of timestamp type), after - * being applied by the `years` transform. + * The `originalPartitionValues`, on the other hand, are partition values from the original input + * splits returned by data sources. It may contain duplicated values. * - * On the other hand, `[0, 0, 1]` is not a valid value for `partitionValues` since `0` is - * duplicated twice. + * For example, if a data source reports partition transform expressions `[years(ts_col)]` with 4 + * input splits whose corresponding partition values are `[0, 1, 2, 2]`, then the `expressions` + * in this case is `[years(ts_col)]`, while `partitionValues` is `[0, 1, 2]`, which + * represents 3 input partitions with distinct partition values. All rows in each partition have + * the same value for column `ts_col` (which is of timestamp type), after being applied by the + * `years`
[spark] branch master updated: [SPARK-45067][BUILD] Upgrade slf4j to 2.0.9
This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 85d1c7f3a5d [SPARK-45067][BUILD] Upgrade slf4j to 2.0.9 85d1c7f3a5d is described below commit 85d1c7f3a5dd0a9162d93b80812a193d8ccfef18 Author: yangjie01 AuthorDate: Mon Sep 4 09:15:44 2023 -0500 [SPARK-45067][BUILD] Upgrade slf4j to 2.0.9 ### What changes were proposed in this pull request? This pr aims upgrade slf4j from 2.0.7 to 2.0.9 ### Why are the changes needed? The release notes as follows: - https://www.slf4j.org/news.html#2.0.9 ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Pass GitHub Actions ### Was this patch authored or co-authored using generative AI tooling? No Closes #42796 from LuciferYang/SPARK-45067. Authored-by: yangjie01 Signed-off-by: Sean Owen --- dev/deps/spark-deps-hadoop-3-hive-2.3 | 6 +++--- pom.xml | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/dev/deps/spark-deps-hadoop-3-hive-2.3 b/dev/deps/spark-deps-hadoop-3-hive-2.3 index 59164c1f8f4..652127a9bb8 100644 --- a/dev/deps/spark-deps-hadoop-3-hive-2.3 +++ b/dev/deps/spark-deps-hadoop-3-hive-2.3 @@ -118,7 +118,7 @@ javassist/3.29.2-GA//javassist-3.29.2-GA.jar javax.jdo/3.2.0-m3//javax.jdo-3.2.0-m3.jar javolution/5.5.1//javolution-5.5.1.jar jaxb-runtime/2.3.2//jaxb-runtime-2.3.2.jar -jcl-over-slf4j/2.0.7//jcl-over-slf4j-2.0.7.jar +jcl-over-slf4j/2.0.9//jcl-over-slf4j-2.0.9.jar jdo-api/3.0.1//jdo-api-3.0.1.jar jdom2/2.0.6//jdom2-2.0.6.jar jersey-client/2.40//jersey-client-2.40.jar @@ -141,7 +141,7 @@ json4s-jackson_2.12/3.7.0-M11//json4s-jackson_2.12-3.7.0-M11.jar json4s-scalap_2.12/3.7.0-M11//json4s-scalap_2.12-3.7.0-M11.jar jsr305/3.0.0//jsr305-3.0.0.jar jta/1.1//jta-1.1.jar -jul-to-slf4j/2.0.7//jul-to-slf4j-2.0.7.jar +jul-to-slf4j/2.0.9//jul-to-slf4j-2.0.9.jar kryo-shaded/4.0.2//kryo-shaded-4.0.2.jar kubernetes-client-api/6.8.1//kubernetes-client-api-6.8.1.jar kubernetes-client/6.8.1//kubernetes-client-6.8.1.jar @@ -233,7 +233,7 @@ scala-parser-combinators_2.12/2.3.0//scala-parser-combinators_2.12-2.3.0.jar scala-reflect/2.12.18//scala-reflect-2.12.18.jar scala-xml_2.12/2.2.0//scala-xml_2.12-2.2.0.jar shims/0.9.45//shims-0.9.45.jar -slf4j-api/2.0.7//slf4j-api-2.0.7.jar +slf4j-api/2.0.9//slf4j-api-2.0.9.jar snakeyaml-engine/2.6//snakeyaml-engine-2.6.jar snakeyaml/2.0//snakeyaml-2.0.jar snappy-java/1.1.10.3//snappy-java-1.1.10.3.jar diff --git a/pom.xml b/pom.xml index efd1c6ffdb9..a61d603fe1c 100644 --- a/pom.xml +++ b/pom.xml @@ -119,7 +119,7 @@ 3.1.0 spark 9.5 -2.0.7 +2.0.9 2.20.0 3.3.6 - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-44890][BUILD] Update miswritten remarks
This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new ba1c2f3b383 [SPARK-44890][BUILD] Update miswritten remarks ba1c2f3b383 is described below commit ba1c2f3b38396c01739375d6e83ac84b581d951e Author: chenyu-opensource <119398199+chenyu-opensou...@users.noreply.github.com> AuthorDate: Mon Sep 4 09:12:33 2023 -0500 [SPARK-44890][BUILD] Update miswritten remarks ### What changes were proposed in this pull request? The PR updates miswritten remarks in pom.xml ### Why are the changes needed? More accurate and standardized ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? It doesn't need to. It is annotation information that does not affect actual operation ### Was this patch authored or co-authored using generative AI tooling? No Closes #42598 from chenyu-opensource/master. Authored-by: chenyu-opensource <119398199+chenyu-opensou...@users.noreply.github.com> Signed-off-by: Sean Owen --- pom.xml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pom.xml b/pom.xml index 8edc3fd550c..efd1c6ffdb9 100644 --- a/pom.xml +++ b/pom.xml @@ -153,7 +153,7 @@ 2.5.1 2.0.8 4.2.19 @@ -175,7 +175,7 @@ 2.12.18 2.12 2.2.0 - + 4.8.0 false 2.16.0 @@ -204,7 +204,7 @@ 3.1.9 2.40 2.12.5 - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.5 updated: [SPARK-45042][BUILD][3.5] Upgrade jetty to 9.4.52.v20230823
This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.5 by this push: new 950b2f29105 [SPARK-45042][BUILD][3.5] Upgrade jetty to 9.4.52.v20230823 950b2f29105 is described below commit 950b2f29105cd66355eef10503a93d678087c79e Author: panbingkun AuthorDate: Mon Sep 4 09:01:50 2023 -0500 [SPARK-45042][BUILD][3.5] Upgrade jetty to 9.4.52.v20230823 ### What changes were proposed in this pull request? The pr aims to Upgrade jetty from 9.4.51.v20230217 to 9.4.52.v20230823. (Backport to Spark 3.5.0) ### Why are the changes needed? - This is a release of the https://github.com/eclipse/jetty.project/issues/7958 that was sponsored by a [support contract from Webtide.com](mailto:saleswebtide.com) - The newest version fix a possible security issue: This release provides a workaround for Security Advisory https://github.com/advisories/GHSA-58qw-p7qm-5rvh - The release note as follows: https://github.com/eclipse/jetty.project/releases/tag/jetty-9.4.52.v20230823 ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass GA. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #42795 from panbingkun/branch-3.5_SPARK-45042. Authored-by: panbingkun Signed-off-by: Sean Owen --- dev/deps/spark-deps-hadoop-3-hive-2.3 | 4 ++-- pom.xml | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/dev/deps/spark-deps-hadoop-3-hive-2.3 b/dev/deps/spark-deps-hadoop-3-hive-2.3 index b6aba589d5f..1d02f8dba56 100644 --- a/dev/deps/spark-deps-hadoop-3-hive-2.3 +++ b/dev/deps/spark-deps-hadoop-3-hive-2.3 @@ -130,8 +130,8 @@ jersey-container-servlet/2.40//jersey-container-servlet-2.40.jar jersey-hk2/2.40//jersey-hk2-2.40.jar jersey-server/2.40//jersey-server-2.40.jar jettison/1.1//jettison-1.1.jar -jetty-util-ajax/9.4.51.v20230217//jetty-util-ajax-9.4.51.v20230217.jar -jetty-util/9.4.51.v20230217//jetty-util-9.4.51.v20230217.jar +jetty-util-ajax/9.4.52.v20230823//jetty-util-ajax-9.4.52.v20230823.jar +jetty-util/9.4.52.v20230823//jetty-util-9.4.52.v20230823.jar jline/2.14.6//jline-2.14.6.jar joda-time/2.12.5//joda-time-2.12.5.jar jodd-core/3.5.2//jodd-core-3.5.2.jar diff --git a/pom.xml b/pom.xml index 154ca4005f6..8fc4b89a78c 100644 --- a/pom.xml +++ b/pom.xml @@ -143,7 +143,7 @@ 1.13.1 1.9.1 shaded-protobuf -9.4.51.v20230217 +9.4.52.v20230823 4.0.3 0.10.0
[spark] branch branch-3.4 updated: [SPARK-44846][SQL] Convert the lower redundant Aggregate to Project in RemoveRedundantAggregates
This is an automated email from the ASF dual-hosted git repository. yumwang pushed a commit to branch branch-3.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.4 by this push: new 5a1c6e6ffe2 [SPARK-44846][SQL] Convert the lower redundant Aggregate to Project in RemoveRedundantAggregates 5a1c6e6ffe2 is described below commit 5a1c6e6ffe244461d23de98ddb317904db19fc4b Author: zml1206 AuthorDate: Mon Sep 4 20:23:39 2023 +0800 [SPARK-44846][SQL] Convert the lower redundant Aggregate to Project in RemoveRedundantAggregates ### What changes were proposed in this pull request? This PR provides a safe way to remove a redundant `Aggregate` in rule `RemoveRedundantAggregates`. Just convert the lower redundant `Aggregate` to `Project`. ### Why are the changes needed? The aggregate contains complex grouping expressions after `RemoveRedundantAggregates`, if `aggregateExpressions` has (if / case) branches, it is possible that `groupingExpressions` is no longer a subexpression of `aggregateExpressions` after execute `PushFoldableIntoBranches` rule, Then cause `boundReference` error. For example ``` SELECT c * 2 AS d FROM ( SELECT if(b > 1, 1, b) AS c FROM ( SELECT if(a < 0, 0, a) AS b FROM VALUES (-1), (1), (2) AS t1(a) ) t2 GROUP BY b ) t3 GROUP BY c ``` Before pr ``` == Optimized Logical Plan == Aggregate [if ((b#0 > 1)) 1 else b#0], [if ((b#0 > 1)) 2 else (b#0 * 2) AS d#2] +- Project [if ((a#3 < 0)) 0 else a#3 AS b#0] +- LocalRelation [a#3] ``` ``` == Error == Couldn't find b#0 in [if ((b#0 > 1)) 1 else b#0#7] java.lang.IllegalStateException: Couldn't find b#0 in [if ((b#0 > 1)) 1 else b#0#7] at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:80) at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:73) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$3(TreeNode.scala:466) at org.apache.spark.sql.catalyst.trees.BinaryLike.mapChildren(TreeNode.scala:1241) at org.apache.spark.sql.catalyst.trees.BinaryLike.mapChildren$(TreeNode.scala:1240) at org.apache.spark.sql.catalyst.expressions.BinaryExpression.mapChildren(Expression.scala:653) .. ``` After pr ``` == Optimized Logical Plan == Aggregate [c#1], [(c#1 * 2) AS d#2] +- Project [if ((b#0 > 1)) 1 else b#0 AS c#1] +- Project [if ((a#3 < 0)) 0 else a#3 AS b#0] +- LocalRelation [a#3] ``` ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? UT Closes #42633 from zml1206/SPARK-44846-2. Authored-by: zml1206 Signed-off-by: Yuming Wang (cherry picked from commit 32a87f03da7eef41161a5a7a3aba4a48e0421912) Signed-off-by: Yuming Wang --- .../optimizer/RemoveRedundantAggregates.scala | 19 ++- .../optimizer/RemoveRedundantAggregatesSuite.scala | 21 - .../test/resources/sql-tests/inputs/group-by.sql| 13 + .../resources/sql-tests/results/group-by.sql.out| 18 ++ 4 files changed, 45 insertions(+), 26 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAggregates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAggregates.scala index 2104bce3711..0c3d5bcf01a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAggregates.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAggregates.scala @@ -17,7 +17,6 @@ package org.apache.spark.sql.catalyst.optimizer -import org.apache.spark.sql.catalyst.analysis.PullOutNondeterministic import org.apache.spark.sql.catalyst.expressions.{AliasHelper, AttributeSet, ExpressionSet} import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan, Project} @@ -32,22 +31,8 @@ object RemoveRedundantAggregates extends Rule[LogicalPlan] with AliasHelper { def apply(plan: LogicalPlan): LogicalPlan = plan.transformUpWithPruning( _.containsPattern(AGGREGATE), ruleId) { case upper @ Aggregate(_,
[spark] branch branch-3.5 updated: [SPARK-44846][SQL] Convert the lower redundant Aggregate to Project in RemoveRedundantAggregates
This is an automated email from the ASF dual-hosted git repository. yumwang pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.5 by this push: new 5c801fc1171 [SPARK-44846][SQL] Convert the lower redundant Aggregate to Project in RemoveRedundantAggregates 5c801fc1171 is described below commit 5c801fc11718293d76c39c9d79d943fec9103ae4 Author: zml1206 AuthorDate: Mon Sep 4 20:23:39 2023 +0800 [SPARK-44846][SQL] Convert the lower redundant Aggregate to Project in RemoveRedundantAggregates ### What changes were proposed in this pull request? This PR provides a safe way to remove a redundant `Aggregate` in rule `RemoveRedundantAggregates`. Just convert the lower redundant `Aggregate` to `Project`. ### Why are the changes needed? The aggregate contains complex grouping expressions after `RemoveRedundantAggregates`, if `aggregateExpressions` has (if / case) branches, it is possible that `groupingExpressions` is no longer a subexpression of `aggregateExpressions` after execute `PushFoldableIntoBranches` rule, Then cause `boundReference` error. For example ``` SELECT c * 2 AS d FROM ( SELECT if(b > 1, 1, b) AS c FROM ( SELECT if(a < 0, 0, a) AS b FROM VALUES (-1), (1), (2) AS t1(a) ) t2 GROUP BY b ) t3 GROUP BY c ``` Before pr ``` == Optimized Logical Plan == Aggregate [if ((b#0 > 1)) 1 else b#0], [if ((b#0 > 1)) 2 else (b#0 * 2) AS d#2] +- Project [if ((a#3 < 0)) 0 else a#3 AS b#0] +- LocalRelation [a#3] ``` ``` == Error == Couldn't find b#0 in [if ((b#0 > 1)) 1 else b#0#7] java.lang.IllegalStateException: Couldn't find b#0 in [if ((b#0 > 1)) 1 else b#0#7] at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:80) at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:73) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$3(TreeNode.scala:466) at org.apache.spark.sql.catalyst.trees.BinaryLike.mapChildren(TreeNode.scala:1241) at org.apache.spark.sql.catalyst.trees.BinaryLike.mapChildren$(TreeNode.scala:1240) at org.apache.spark.sql.catalyst.expressions.BinaryExpression.mapChildren(Expression.scala:653) .. ``` After pr ``` == Optimized Logical Plan == Aggregate [c#1], [(c#1 * 2) AS d#2] +- Project [if ((b#0 > 1)) 1 else b#0 AS c#1] +- Project [if ((a#3 < 0)) 0 else a#3 AS b#0] +- LocalRelation [a#3] ``` ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? UT Closes #42633 from zml1206/SPARK-44846-2. Authored-by: zml1206 Signed-off-by: Yuming Wang (cherry picked from commit 32a87f03da7eef41161a5a7a3aba4a48e0421912) Signed-off-by: Yuming Wang --- .../optimizer/RemoveRedundantAggregates.scala | 19 ++- .../optimizer/RemoveRedundantAggregatesSuite.scala | 21 - .../sql-tests/analyzer-results/group-by.sql.out | 21 + .../test/resources/sql-tests/inputs/group-by.sql| 13 + .../resources/sql-tests/results/group-by.sql.out| 18 ++ 5 files changed, 66 insertions(+), 26 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAggregates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAggregates.scala index 93f3557a8c8..badf4065f5f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAggregates.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAggregates.scala @@ -17,7 +17,6 @@ package org.apache.spark.sql.catalyst.optimizer -import org.apache.spark.sql.catalyst.analysis.PullOutNondeterministic import org.apache.spark.sql.catalyst.expressions.{AliasHelper, AttributeSet, ExpressionSet} import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan, Project} @@ -32,22 +31,8 @@ object RemoveRedundantAggregates extends Rule[LogicalPlan] with AliasHelper { def apply(plan: LogicalPlan): LogicalPlan =
[spark] branch master updated: [SPARK-44846][SQL] Convert the lower redundant Aggregate to Project in RemoveRedundantAggregates
This is an automated email from the ASF dual-hosted git repository. yumwang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 32a87f03da7 [SPARK-44846][SQL] Convert the lower redundant Aggregate to Project in RemoveRedundantAggregates 32a87f03da7 is described below commit 32a87f03da7eef41161a5a7a3aba4a48e0421912 Author: zml1206 AuthorDate: Mon Sep 4 20:23:39 2023 +0800 [SPARK-44846][SQL] Convert the lower redundant Aggregate to Project in RemoveRedundantAggregates ### What changes were proposed in this pull request? This PR provides a safe way to remove a redundant `Aggregate` in rule `RemoveRedundantAggregates`. Just convert the lower redundant `Aggregate` to `Project`. ### Why are the changes needed? The aggregate contains complex grouping expressions after `RemoveRedundantAggregates`, if `aggregateExpressions` has (if / case) branches, it is possible that `groupingExpressions` is no longer a subexpression of `aggregateExpressions` after execute `PushFoldableIntoBranches` rule, Then cause `boundReference` error. For example ``` SELECT c * 2 AS d FROM ( SELECT if(b > 1, 1, b) AS c FROM ( SELECT if(a < 0, 0, a) AS b FROM VALUES (-1), (1), (2) AS t1(a) ) t2 GROUP BY b ) t3 GROUP BY c ``` Before pr ``` == Optimized Logical Plan == Aggregate [if ((b#0 > 1)) 1 else b#0], [if ((b#0 > 1)) 2 else (b#0 * 2) AS d#2] +- Project [if ((a#3 < 0)) 0 else a#3 AS b#0] +- LocalRelation [a#3] ``` ``` == Error == Couldn't find b#0 in [if ((b#0 > 1)) 1 else b#0#7] java.lang.IllegalStateException: Couldn't find b#0 in [if ((b#0 > 1)) 1 else b#0#7] at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:80) at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:73) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$3(TreeNode.scala:466) at org.apache.spark.sql.catalyst.trees.BinaryLike.mapChildren(TreeNode.scala:1241) at org.apache.spark.sql.catalyst.trees.BinaryLike.mapChildren$(TreeNode.scala:1240) at org.apache.spark.sql.catalyst.expressions.BinaryExpression.mapChildren(Expression.scala:653) .. ``` After pr ``` == Optimized Logical Plan == Aggregate [c#1], [(c#1 * 2) AS d#2] +- Project [if ((b#0 > 1)) 1 else b#0 AS c#1] +- Project [if ((a#3 < 0)) 0 else a#3 AS b#0] +- LocalRelation [a#3] ``` ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? UT Closes #42633 from zml1206/SPARK-44846-2. Authored-by: zml1206 Signed-off-by: Yuming Wang --- .../optimizer/RemoveRedundantAggregates.scala | 19 ++- .../optimizer/RemoveRedundantAggregatesSuite.scala | 21 - .../sql-tests/analyzer-results/group-by.sql.out | 21 + .../test/resources/sql-tests/inputs/group-by.sql| 13 + .../resources/sql-tests/results/group-by.sql.out| 18 ++ 5 files changed, 66 insertions(+), 26 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAggregates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAggregates.scala index 93f3557a8c8..badf4065f5f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAggregates.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAggregates.scala @@ -17,7 +17,6 @@ package org.apache.spark.sql.catalyst.optimizer -import org.apache.spark.sql.catalyst.analysis.PullOutNondeterministic import org.apache.spark.sql.catalyst.expressions.{AliasHelper, AttributeSet, ExpressionSet} import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan, Project} @@ -32,22 +31,8 @@ object RemoveRedundantAggregates extends Rule[LogicalPlan] with AliasHelper { def apply(plan: LogicalPlan): LogicalPlan = plan.transformUpWithPruning( _.containsPattern(AGGREGATE), ruleId) { case upper @ Aggregate(_, _, lower: Aggregate) if
[spark] branch master updated (b0b7835bee2 -> 416207659aa)
This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from b0b7835bee2 [SPARK-45059][CONNECT][PYTHON] Add `try_reflect` functions to Scala and Python add 416207659aa [SPARK-45033][SQL] Support maps by parameterized `sql()` No new revisions were added by this update. Summary of changes: .../spark/sql/catalyst/analysis/parameters.scala | 15 -- .../org/apache/spark/sql/ParametersSuite.scala | 62 +- 2 files changed, 72 insertions(+), 5 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-45059][CONNECT][PYTHON] Add `try_reflect` functions to Scala and Python
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new b0b7835bee2 [SPARK-45059][CONNECT][PYTHON] Add `try_reflect` functions to Scala and Python b0b7835bee2 is described below commit b0b7835bee2837c6e2875547aca259e02d2b0af7 Author: Jia Fan AuthorDate: Mon Sep 4 16:28:08 2023 +0800 [SPARK-45059][CONNECT][PYTHON] Add `try_reflect` functions to Scala and Python ### What changes were proposed in this pull request? Add new `try_reflect` funtion to python and connect. ### Why are the changes needed? for parity ### Does this PR introduce _any_ user-facing change? Yes, new function ### How was this patch tested? add new test. ### Was this patch authored or co-authored using generative AI tooling? No Closes #42783 from Hisoka-X/SPARK-45059_try_reflect_to_python. Authored-by: Jia Fan Signed-off-by: Ruifeng Zheng --- .../scala/org/apache/spark/sql/functions.scala | 9 ++ .../apache/spark/sql/PlanGenerationTestSuite.scala | 4 +++ .../explain-results/function_try_reflect.explain | 2 ++ .../query-tests/queries/function_try_reflect.json | 33 + .../queries/function_try_reflect.proto.bin | Bin 0 -> 216 bytes .../source/reference/pyspark.sql/functions.rst | 1 + python/pyspark/sql/connect/functions.py| 7 + python/pyspark/sql/functions.py| 29 ++ python/pyspark/sql/tests/test_functions.py | 2 -- .../scala/org/apache/spark/sql/functions.scala | 3 +- 10 files changed, 87 insertions(+), 3 deletions(-) diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala index baafdd4e172..9ead800ace7 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala @@ -3629,6 +3629,15 @@ object functions { */ def java_method(cols: Column*): Column = Column.fn("java_method", cols: _*) + /** + * This is a special version of `reflect` that performs the same operation, but returns a NULL + * value instead of raising an error if the invoke method thrown exception. + * + * @group misc_funcs + * @since 4.0.0 + */ + def try_reflect(cols: Column*): Column = Column.fn("try_reflect", cols: _*) + /** * Returns the Spark version. The string contains 2 fields, the first being a release version * and the second being a git revision. diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala index c457f269213..aa15fbd75ff 100644 --- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala @@ -2864,6 +2864,10 @@ class PlanGenerationTestSuite fn.java_method(lit("java.util.UUID"), lit("fromString"), fn.col("g")) } + functionTest("try_reflect") { +fn.try_reflect(lit("java.util.UUID"), lit("fromString"), fn.col("g")) + } + functionTest("typeof") { fn.typeof(fn.col("g")) } diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_try_reflect.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_try_reflect.explain new file mode 100644 index 000..5c68f3bf2c1 --- /dev/null +++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_try_reflect.explain @@ -0,0 +1,2 @@ +Project [reflect(java.util.UUID, fromString, g#0, false) AS try_reflect(java.util.UUID, fromString, g)#0] ++- LocalRelation , [id#0L, a#0, b#0, d#0, e#0, f#0, g#0] diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_try_reflect.json b/connector/connect/common/src/test/resources/query-tests/queries/function_try_reflect.json new file mode 100644 index 000..de3fae90c2c --- /dev/null +++ b/connector/connect/common/src/test/resources/query-tests/queries/function_try_reflect.json @@ -0,0 +1,33 @@ +{ + "common": { +"planId": "1" + }, + "project": { +"input": { + "common": { +"planId": "0" + }, + "localRelation": { +"schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e" + } +}, +"expressions": [{ + "unresolvedFunction": { +"functionName":
[spark] branch branch-3.5 updated: [SPARK-45052][SQL][PYTHON][CONNECT][3.5] Make function aliases output column name consistent with SQL
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.5 by this push: new 6112d78cba2 [SPARK-45052][SQL][PYTHON][CONNECT][3.5] Make function aliases output column name consistent with SQL 6112d78cba2 is described below commit 6112d78cba20fd2e9aa298190371dd52205dc762 Author: Ruifeng Zheng AuthorDate: Mon Sep 4 16:24:43 2023 +0800 [SPARK-45052][SQL][PYTHON][CONNECT][3.5] Make function aliases output column name consistent with SQL ### What changes were proposed in this pull request? backport https://github.com/apache/spark/pull/42775 to 3.5 ### Why are the changes needed? to make `func(col)` consistent with `expr(func(col))` ### Does this PR introduce _any_ user-facing change? yes ### How was this patch tested? ci ### Was this patch authored or co-authored using generative AI tooling? no Closes #42786 from zhengruifeng/try_column_name_35. Authored-by: Ruifeng Zheng Signed-off-by: Ruifeng Zheng --- .../scala/org/apache/spark/sql/functions.scala | 12 +- .../query-tests/explain-results/describe.explain | 2 +- .../explain-results/function_ceiling.explain | 2 +- .../explain-results/function_ceiling_scale.explain | 2 +- .../explain-results/function_printf.explain| 2 +- .../explain-results/function_sign.explain | 2 +- .../explain-results/function_std.explain | 2 +- .../query-tests/queries/function_ceiling.json | 2 +- .../query-tests/queries/function_ceiling.proto.bin | Bin 173 -> 176 bytes .../queries/function_ceiling_scale.json| 2 +- .../queries/function_ceiling_scale.proto.bin | Bin 179 -> 182 bytes .../query-tests/queries/function_printf.json | 2 +- .../query-tests/queries/function_printf.proto.bin | Bin 196 -> 189 bytes .../query-tests/queries/function_sign.json | 2 +- .../query-tests/queries/function_sign.proto.bin| Bin 175 -> 173 bytes .../query-tests/queries/function_std.json | 2 +- .../query-tests/queries/function_std.proto.bin | Bin 175 -> 172 bytes python/pyspark/sql/connect/functions.py| 26 +- python/pyspark/sql/functions.py| 714 +++-- .../scala/org/apache/spark/sql/functions.scala | 202 +++--- 20 files changed, 628 insertions(+), 348 deletions(-) diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala index fa8c5782e06..fe992ae6740 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala @@ -987,7 +987,7 @@ object functions { * @group agg_funcs * @since 3.5.0 */ - def std(e: Column): Column = stddev(e) + def std(e: Column): Column = Column.fn("std", e) /** * Aggregate function: alias for `stddev_samp`. @@ -2337,7 +2337,7 @@ object functions { * @group math_funcs * @since 3.5.0 */ - def ceiling(e: Column, scale: Column): Column = ceil(e, scale) + def ceiling(e: Column, scale: Column): Column = Column.fn("ceiling", e, scale) /** * Computes the ceiling of the given value of `e` to 0 decimal places. @@ -2345,7 +2345,7 @@ object functions { * @group math_funcs * @since 3.5.0 */ - def ceiling(e: Column): Column = ceil(e) + def ceiling(e: Column): Column = Column.fn("ceiling", e) /** * Convert a number in a string column from one base to another. @@ -2800,7 +2800,7 @@ object functions { * @group math_funcs * @since 3.5.0 */ - def power(l: Column, r: Column): Column = pow(l, r) + def power(l: Column, r: Column): Column = Column.fn("power", l, r) /** * Returns the positive value of dividend mod divisor. @@ -2937,7 +2937,7 @@ object functions { * @group math_funcs * @since 3.5.0 */ - def sign(e: Column): Column = signum(e) + def sign(e: Column): Column = Column.fn("sign", e) /** * Computes the signum of the given value. @@ -4420,7 +4420,7 @@ object functions { * @since 3.5.0 */ def printf(format: Column, arguments: Column*): Column = -Column.fn("format_string", lit(format) +: arguments: _*) +Column.fn("printf", (format +: arguments): _*) /** * Decodes a `str` in 'application/x-www-form-urlencoded' format using a specific encoding diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/describe.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/describe.explain index f205f7ef7a1..b203f715c71 100644 --- a/connector/connect/common/src/test/resources/query-tests/explain-results/describe.explain +++
[spark] branch master updated (f2a6c97d718 -> d03ebced0ef)
This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from f2a6c97d718 [SPARK-44876][PYTHON][FOLLOWUP] Fix Arrow-optimized Python UDF to delay wrapping the function with fail_on_stopiteration add d03ebced0ef [SPARK-45060][SQL] Fix an internal error from `to_char()`on `NULL` format No new revisions were added by this update. Summary of changes: common/utils/src/main/resources/error/error-classes.json | 5 + ...error-conditions-invalid-parameter-value-error-class.md | 4 .../sql/catalyst/expressions/numberFormatExpressions.scala | 8 ++-- .../apache/spark/sql/errors/QueryCompilationErrors.scala | 8 .../scala/org/apache/spark/sql/StringFunctionsSuite.scala | 14 ++ 5 files changed, 37 insertions(+), 2 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-44876][PYTHON][FOLLOWUP] Fix Arrow-optimized Python UDF to delay wrapping the function with fail_on_stopiteration
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new f2a6c97d718 [SPARK-44876][PYTHON][FOLLOWUP] Fix Arrow-optimized Python UDF to delay wrapping the function with fail_on_stopiteration f2a6c97d718 is described below commit f2a6c97d718839896343feaa520396f328f2f866 Author: Takuya UESHIN AuthorDate: Mon Sep 4 15:24:33 2023 +0800 [SPARK-44876][PYTHON][FOLLOWUP] Fix Arrow-optimized Python UDF to delay wrapping the function with fail_on_stopiteration ### What changes were proposed in this pull request? Fixes Arrow-optimized Python UDF to delay wrapping the function with `fail_on_stopiteration`. Also removed unnecessary verification `verify_result_type`. ### Why are the changes needed? For Arrow-optimized Python UDF, `fail_on_stopiteration` can be applied to only the wrapped function to avoid unnecessary overhead. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Added the related test. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #42784 from ueshin/issues/SPARK-44876/fail_on_stopiteration. Authored-by: Takuya UESHIN Signed-off-by: Ruifeng Zheng --- python/pyspark/sql/tests/test_udf.py | 15 +++ python/pyspark/worker.py | 22 ++ 2 files changed, 21 insertions(+), 16 deletions(-) diff --git a/python/pyspark/sql/tests/test_udf.py b/python/pyspark/sql/tests/test_udf.py index 32ea05bd00a..1f895b1780b 100644 --- a/python/pyspark/sql/tests/test_udf.py +++ b/python/pyspark/sql/tests/test_udf.py @@ -1005,6 +1005,21 @@ class BaseUDFTestsMixin(object): with self.subTest(with_b=True, query_no=i): assertDataFrameEqual(df, [Row(0), Row(101)]) +def test_raise_stop_iteration(self): +@udf("int") +def test_udf(a): +if a < 5: +return a +else: +raise StopIteration() + +assertDataFrameEqual( +self.spark.range(5).select(test_udf(col("id"))), [Row(i) for i in range(5)] +) + +with self.assertRaisesRegex(PythonException, "StopIteration"): +self.spark.range(10).select(test_udf(col("id"))).show() + class UDFTests(BaseUDFTestsMixin, ReusedSQLTestCase): @classmethod diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py index fff99f1de3d..92bc622775b 100644 --- a/python/pyspark/worker.py +++ b/python/pyspark/worker.py @@ -139,6 +139,7 @@ def wrap_arrow_batch_udf(f, return_type): elif type(return_type) == BinaryType: result_func = lambda r: bytes(r) if r is not None else r # noqa: E731 +@fail_on_stopiteration def evaluate(*args: pd.Series, **kwargs: pd.Series) -> pd.Series: keys = list(kwargs.keys()) len_args = len(args) @@ -151,18 +152,6 @@ def wrap_arrow_batch_udf(f, return_type): ] ) -def verify_result_type(result): -if not hasattr(result, "__len__"): -pd_type = "pandas.DataFrame" if type(return_type) == StructType else "pandas.Series" -raise PySparkTypeError( -error_class="UDF_RETURN_TYPE", -message_parameters={ -"expected": pd_type, -"actual": type(result).__name__, -}, -) -return result - def verify_result_length(result, length): if len(result) != length: raise PySparkRuntimeError( @@ -175,9 +164,7 @@ def wrap_arrow_batch_udf(f, return_type): return result return lambda *a, **kw: ( -verify_result_length( -verify_result_type(evaluate(*a, **kw)), len((list(a) + list(kw.values()))[0]) -), +verify_result_length(evaluate(*a, **kw), len((list(a) + list(kw.values()))[0])), arrow_return_type, ) @@ -562,7 +549,10 @@ def read_single_udf(pickleSer, infile, eval_type, runner_conf, udf_index): else: chained_func = chain(chained_func, f) -if eval_type == PythonEvalType.SQL_SCALAR_PANDAS_ITER_UDF: +if eval_type in ( +PythonEvalType.SQL_SCALAR_PANDAS_ITER_UDF, +PythonEvalType.SQL_ARROW_BATCHED_UDF, +): func = chained_func else: # make sure StopIteration's raised in the user code are not ignored - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.5 updated: [SPARK-44876][PYTHON][FOLLOWUP][3.5] Fix Arrow-optimized Python UDF to delay wrapping the function with fail_on_stopiteration
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.5 by this push: new fe3a20a5e23 [SPARK-44876][PYTHON][FOLLOWUP][3.5] Fix Arrow-optimized Python UDF to delay wrapping the function with fail_on_stopiteration fe3a20a5e23 is described below commit fe3a20a5e231fd151b141e72ea8a1090647a Author: Takuya UESHIN AuthorDate: Mon Sep 4 15:25:33 2023 +0800 [SPARK-44876][PYTHON][FOLLOWUP][3.5] Fix Arrow-optimized Python UDF to delay wrapping the function with fail_on_stopiteration ### What changes were proposed in this pull request? This is a backport of https://github.com/apache/spark/pull/42784. Fixes Arrow-optimized Python UDF to delay wrapping the function with `fail_on_stopiteration`. Also removed unnecessary verification `verify_result_type`. ### Why are the changes needed? For Arrow-optimized Python UDF, `fail_on_stopiteration` can be applied to only the wrapped function to avoid unnecessary overhead. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Added the related test. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #42785 from ueshin/issues/SPARK-44876/3.5/fail_on_stopiteration. Authored-by: Takuya UESHIN Signed-off-by: Ruifeng Zheng --- python/pyspark/sql/tests/test_udf.py | 21 ++--- python/pyspark/worker.py | 22 +++--- 2 files changed, 25 insertions(+), 18 deletions(-) diff --git a/python/pyspark/sql/tests/test_udf.py b/python/pyspark/sql/tests/test_udf.py index 239ff27813b..2f8c1cd2136 100644 --- a/python/pyspark/sql/tests/test_udf.py +++ b/python/pyspark/sql/tests/test_udf.py @@ -24,7 +24,7 @@ import datetime from pyspark import SparkContext, SQLContext from pyspark.sql import SparkSession, Column, Row -from pyspark.sql.functions import udf, assert_true, lit, rand +from pyspark.sql.functions import col, udf, assert_true, lit, rand from pyspark.sql.udf import UserDefinedFunction from pyspark.sql.types import ( StringType, @@ -38,9 +38,9 @@ from pyspark.sql.types import ( TimestampNTZType, DayTimeIntervalType, ) -from pyspark.errors import AnalysisException, PySparkTypeError +from pyspark.errors import AnalysisException, PythonException, PySparkTypeError from pyspark.testing.sqlutils import ReusedSQLTestCase, test_compiled, test_not_compiled_message -from pyspark.testing.utils import QuietTest +from pyspark.testing.utils import QuietTest, assertDataFrameEqual class BaseUDFTestsMixin(object): @@ -898,6 +898,21 @@ class BaseUDFTestsMixin(object): self.assertEquals(row[1], {"a": "b"}) self.assertEquals(row[2], Row(col1=1, col2=2)) +def test_raise_stop_iteration(self): +@udf("int") +def test_udf(a): +if a < 5: +return a +else: +raise StopIteration() + +assertDataFrameEqual( +self.spark.range(5).select(test_udf(col("id"))), [Row(i) for i in range(5)] +) + +with self.assertRaisesRegex(PythonException, "StopIteration"): +self.spark.range(10).select(test_udf(col("id"))).show() + class UDFTests(BaseUDFTestsMixin, ReusedSQLTestCase): @classmethod diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py index edbfad4a5dc..d2ea18c45c9 100644 --- a/python/pyspark/worker.py +++ b/python/pyspark/worker.py @@ -154,20 +154,9 @@ def wrap_arrow_batch_udf(f, return_type): elif type(return_type) == BinaryType: result_func = lambda r: bytes(r) if r is not None else r # noqa: E731 +@fail_on_stopiteration def evaluate(*args: pd.Series) -> pd.Series: -return pd.Series(result_func(f(*a)) for a in zip(*args)) - -def verify_result_type(result): -if not hasattr(result, "__len__"): -pd_type = "pandas.DataFrame" if type(return_type) == StructType else "pandas.Series" -raise PySparkTypeError( -error_class="UDF_RETURN_TYPE", -message_parameters={ -"expected": pd_type, -"actual": type(result).__name__, -}, -) -return result +return pd.Series([result_func(f(*a)) for a in zip(*args)]) def verify_result_length(result, length): if len(result) != length: @@ -181,7 +170,7 @@ def wrap_arrow_batch_udf(f, return_type): return result return lambda *a: ( -verify_result_length(verify_result_type(evaluate(*a)), len(a[0])), +verify_result_length(evaluate(*a), len(a[0])), arrow_return_type, ) @@ -543,7 +532,10 @@ def read_single_udf(pickleSer, infile, eval_type,