This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 4686c2733702 [SPARK-45073][PS][CONNECT] Replace `LastNotNull` with 
`Last(ignoreNulls=True)`
4686c2733702 is described below

commit 4686c27337025dd1a616da73b19abe7ea00a4624
Author: Ruifeng Zheng <ruife...@apache.org>
AuthorDate: Tue Sep 5 11:13:35 2023 +0800

    [SPARK-45073][PS][CONNECT] Replace `LastNotNull` with 
`Last(ignoreNulls=True)`
    
    ### What changes were proposed in this pull request?
    Replace `LastNotNull` with `Last(ignoreNulls=True)`
    
    ### Why are the changes needed?
    
    https://github.com/apache/spark/pull/36127 introduced a PS dedicated 
expression `LastNotNull`, which was actually not needed and can be replaced 
with built-in `Last`
    
    ### Does this PR introduce _any_ user-facing change?
    NO
    
    ### How was this patch tested?
    CI
    
    ### Was this patch authored or co-authored using generative AI tooling?
    NO
    
    Closes #42808 from zhengruifeng/del_last_not_none.
    
    Authored-by: Ruifeng Zheng <ruife...@apache.org>
    Signed-off-by: Ruifeng Zheng <ruife...@apache.org>
---
 .../sql/connect/planner/SparkConnectPlanner.scala  |  4 ---
 python/pyspark/pandas/series.py                    |  2 +-
 python/pyspark/pandas/spark/functions.py           | 14 --------
 .../catalyst/expressions/windowExpressions.scala   | 37 ----------------------
 .../spark/sql/api/python/PythonSQLUtils.scala      |  2 --
 5 files changed, 1 insertion(+), 58 deletions(-)

diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
index 579b378d09f6..1a63c9fc27c6 100644
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
@@ -1905,10 +1905,6 @@ class SparkConnectPlanner(val sessionHolder: 
SessionHolder) extends Logging {
         val ignoreNA = extractBoolean(children(2), "ignoreNA")
         Some(EWM(children(0), alpha, ignoreNA))
 
-      case "last_non_null" if fun.getArgumentsCount == 1 =>
-        val children = fun.getArgumentsList.asScala.map(transformExpression)
-        Some(LastNonNull(children(0)))
-
       case "null_index" if fun.getArgumentsCount == 1 =>
         val children = fun.getArgumentsList.asScala.map(transformExpression)
         Some(NullIndex(children(0)))
diff --git a/python/pyspark/pandas/series.py b/python/pyspark/pandas/series.py
index 7fa08c6d9b24..863e98c42ead 100644
--- a/python/pyspark/pandas/series.py
+++ b/python/pyspark/pandas/series.py
@@ -2257,7 +2257,7 @@ class Series(Frame, IndexOpsMixin, Generic[T]):
             return self._psdf.copy()._psser_for(self._column_label)
 
         scol = self.spark.column
-        last_non_null = SF.last_non_null(scol)
+        last_non_null = F.last(scol, True)
         null_index = SF.null_index(scol)
 
         Window = get_window_class()
diff --git a/python/pyspark/pandas/spark/functions.py 
b/python/pyspark/pandas/spark/functions.py
index d6f6c6fdeebc..b0bc6efcd56e 100644
--- a/python/pyspark/pandas/spark/functions.py
+++ b/python/pyspark/pandas/spark/functions.py
@@ -159,20 +159,6 @@ def ewm(col: Column, alpha: float, ignore_na: bool) -> 
Column:
         return Column(sc._jvm.PythonSQLUtils.ewm(col._jc, alpha, ignore_na))
 
 
-def last_non_null(col: Column) -> Column:
-    if is_remote():
-        from pyspark.sql.connect.functions import _invoke_function_over_columns
-
-        return _invoke_function_over_columns(  # type: ignore[return-value]
-            "last_non_null",
-            col,  # type: ignore[arg-type]
-        )
-
-    else:
-        sc = SparkContext._active_spark_context
-        return Column(sc._jvm.PythonSQLUtils.lastNonNull(col._jc))
-
-
 def null_index(col: Column) -> Column:
     if is_remote():
         from pyspark.sql.connect.functions import _invoke_function_over_columns
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala
index 50c98c01645d..bc61170f567f 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala
@@ -1152,43 +1152,6 @@ case class EWM(input: Expression, alpha: Double, 
ignoreNA: Boolean)
 }
 
 
-/**
- * Keep the last non-null value seen if any. This expression is dedicated only 
for
- * Pandas API on Spark.
- * For example,
- *  Input: null, 1, 2, 3, null, 4, 5, null
- *  Output: null, 1, 2, 3, 3, 4, 5, 5
- */
-case class LastNonNull(input: Expression)
-  extends AggregateWindowFunction with UnaryLike[Expression] {
-
-  override def dataType: DataType = input.dataType
-
-  private lazy val last = AttributeReference("last", dataType, nullable = 
true)()
-
-  override def aggBufferAttributes: Seq[AttributeReference] = last :: Nil
-
-  override lazy val initialValues: Seq[Expression] = Seq(Literal.create(null, 
dataType))
-
-  override lazy val updateExpressions: Seq[Expression] = {
-    Seq(
-      /* last = */ If(IsNull(input), last, input)
-    )
-  }
-
-  override lazy val evaluateExpression: Expression = last
-
-  override def prettyName: String = "last_non_null"
-
-  override def sql: String = s"$prettyName(${input.sql})"
-
-  override def child: Expression = input
-
-  override protected def withNewChildInternal(newChild: Expression): 
LastNonNull =
-    copy(input = newChild)
-}
-
-
 /**
  * Return the indices for consecutive null values, for non-null values, it 
returns 0.
  * This expression is dedicated only for Pandas API on Spark.
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/api/python/PythonSQLUtils.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/api/python/PythonSQLUtils.scala
index 196377cce2ae..3f0e9369c619 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/api/python/PythonSQLUtils.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/api/python/PythonSQLUtils.scala
@@ -145,8 +145,6 @@ private[sql] object PythonSQLUtils extends Logging {
   def ewm(e: Column, alpha: Double, ignoreNA: Boolean): Column =
     Column(EWM(e.expr, alpha, ignoreNA))
 
-  def lastNonNull(e: Column): Column = Column(LastNonNull(e.expr))
-
   def nullIndex(e: Column): Column = Column(NullIndex(e.expr))
 
   def makeInterval(unit: String, e: Column): Column = {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to