This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 398bff77b1c [SPARK-45999][PS] Use dedicated `PandasProduct` in 
`cumprod`
398bff77b1c is described below

commit 398bff77b1c837aed55f4f10ff1157f7da813570
Author: Ruifeng Zheng <ruife...@apache.org>
AuthorDate: Mon Nov 20 16:55:09 2023 +0800

    [SPARK-45999][PS] Use dedicated `PandasProduct` in `cumprod`
    
    ### What changes were proposed in this pull request?
    Use dedicated `PandasProduct` in `cumprod`
    
    ### Why are the changes needed?
    to be consistent with the `prod`
    
    ### Does this PR introduce _any_ user-facing change?
    no
    
    ### How was this patch tested?
    ci
    
    ### Was this patch authored or co-authored using generative AI tooling?
    no
    
    Closes #43901 from zhengruifeng/ps_cumprod.
    
    Authored-by: Ruifeng Zheng <ruife...@apache.org>
    Signed-off-by: Ruifeng Zheng <ruife...@apache.org>
---
 python/pyspark/pandas/generic.py |  2 --
 python/pyspark/pandas/series.py  | 34 +++++++---------------------------
 2 files changed, 7 insertions(+), 29 deletions(-)

diff --git a/python/pyspark/pandas/generic.py b/python/pyspark/pandas/generic.py
index 23139762882..77cefb53fe5 100644
--- a/python/pyspark/pandas/generic.py
+++ b/python/pyspark/pandas/generic.py
@@ -331,8 +331,6 @@ class Frame(object, metaclass=ABCMeta):
         return self._apply_series_op(lambda psser: psser._cumsum(skipna), 
should_resolve=True)
 
     # TODO: add 'axis' parameter
-    # TODO: use pandas_udf to support negative values and other options later
-    #  other window except unbounded ones is supported as of Spark 3.0.
     def cumprod(self: FrameLike, skipna: bool = True) -> FrameLike:
         """
         Return cumulative product over a DataFrame or Series axis.
diff --git a/python/pyspark/pandas/series.py b/python/pyspark/pandas/series.py
index c9beb6432f9..fedc2417a29 100644
--- a/python/pyspark/pandas/series.py
+++ b/python/pyspark/pandas/series.py
@@ -62,7 +62,6 @@ from pyspark.sql.types import (
     DoubleType,
     FloatType,
     IntegerType,
-    IntegralType,
     LongType,
     NumericType,
     Row,
@@ -6973,36 +6972,17 @@ class Series(Frame, IndexOpsMixin, Generic[T]):
         return psser._cum(F.sum, skipna, part_cols)
 
     def _cumprod(self, skipna: bool, part_cols: Sequence["ColumnOrName"] = ()) 
-> "Series":
-        if isinstance(self.spark.data_type, BooleanType):
-            scol = self._cum(
-                lambda scol: F.min(F.coalesce(scol, F.lit(True))), skipna, 
part_cols
-            ).spark.column.cast(LongType())
-        elif isinstance(self.spark.data_type, NumericType):
-            num_zeros = self._cum(
-                lambda scol: F.sum(F.when(scol == 0, 1).otherwise(0)), skipna, 
part_cols
-            ).spark.column
-            num_negatives = self._cum(
-                lambda scol: F.sum(F.when(scol < 0, 1).otherwise(0)), skipna, 
part_cols
-            ).spark.column
-            sign = F.when(num_negatives % 2 == 0, 1).otherwise(-1)
-
-            abs_prod = F.exp(
-                self._cum(lambda scol: F.sum(F.log(F.abs(scol))), skipna, 
part_cols).spark.column
-            )
-
-            scol = F.when(num_zeros > 0, 0).otherwise(sign * abs_prod)
-
-            if isinstance(self.spark.data_type, IntegralType):
-                scol = F.round(scol).cast(LongType())
-        else:
+        psser = self
+        if isinstance(psser.spark.data_type, BooleanType):
+            psser = psser.spark.transform(lambda scol: scol.cast(LongType()))
+        elif not isinstance(psser.spark.data_type, NumericType):
             raise TypeError(
                 "Could not convert {} ({}) to numeric".format(
-                    spark_type_to_pandas_dtype(self.spark.data_type),
-                    self.spark.data_type.simpleString(),
+                    spark_type_to_pandas_dtype(psser.spark.data_type),
+                    psser.spark.data_type.simpleString(),
                 )
             )
-
-        return self._with_new_scol(scol)
+        return psser._cum(lambda c: SF.product(c, skipna), skipna, part_cols)
 
     # ----------------------------------------------------------------------
     # Accessor Methods


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to