This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new ea3061beedf [SPARK-44340][SPARK-44341][SQL][PYTHON][FOLLOWUP] Set 
partition index correctly for WindowGroupLimitExec,WindowExec and 
WindowInPandasExec
ea3061beedf is described below

commit ea3061beedf7dc10f14e8de27d540dbcc5894fe7
Author: Jiaan Geng <belie...@163.com>
AuthorDate: Mon Jul 31 13:53:32 2023 +0800

    [SPARK-44340][SPARK-44341][SQL][PYTHON][FOLLOWUP] Set partition index 
correctly for WindowGroupLimitExec,WindowExec and WindowInPandasExec
    
    ### What changes were proposed in this pull request?
    This is a followup of https://github.com/apache/spark/pull/41899 and 
https://github.com/apache/spark/pull/41939, to set the partition index 
correctly even if it's not used for now. It also contains a few code cleanup.
    
    ### Why are the changes needed?
    future-proof
    
    ### Does this PR introduce _any_ user-facing change?
    'No'.
    
    ### How was this patch tested?
    existing tests
    
    Closes #42208 from beliefer/SPARK-44340_followup.
    
    Authored-by: Jiaan Geng <belie...@163.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../apache/spark/sql/execution/python/WindowInPandasExec.scala    | 8 +++-----
 .../scala/org/apache/spark/sql/execution/window/WindowExec.scala  | 8 +++-----
 .../apache/spark/sql/execution/window/WindowGroupLimitExec.scala  | 8 +++-----
 3 files changed, 9 insertions(+), 15 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala
index ba1f2c132ff..ee0044162b9 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala
@@ -80,24 +80,22 @@ case class WindowInPandasExec(
   )
 
   protected override def doExecute(): RDD[InternalRow] = {
-    val spillSize = longMetric("spillSize")
-
     val evaluatorFactory =
       new WindowInPandasEvaluatorFactory(
         windowExpression,
         partitionSpec,
         orderSpec,
         child.output,
-        spillSize,
+        longMetric("spillSize"),
         pythonMetrics)
 
     // Start processing.
     if (conf.usePartitionEvaluator) {
       child.execute().mapPartitionsWithEvaluator(evaluatorFactory)
     } else {
-      child.execute().mapPartitions { iter =>
+      child.execute().mapPartitionsWithIndex { (index, rowIterator) =>
         val evaluator = evaluatorFactory.createEvaluator()
-        evaluator.eval(0, iter)
+        evaluator.eval(index, rowIterator)
       }
     }
   }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala
index 35e59aef94f..9ecd1c587a7 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala
@@ -95,23 +95,21 @@ case class WindowExec(
   )
 
   protected override def doExecute(): RDD[InternalRow] = {
-    val spillSize = longMetric("spillSize")
-
     val evaluatorFactory =
       new WindowEvaluatorFactory(
         windowExpression,
         partitionSpec,
         orderSpec,
         child.output,
-        spillSize)
+        longMetric("spillSize"))
 
     // Start processing.
     if (conf.usePartitionEvaluator) {
       child.execute().mapPartitionsWithEvaluator(evaluatorFactory)
     } else {
-      child.execute().mapPartitions { iter =>
+      child.execute().mapPartitionsWithIndex { (index, rowIterator) =>
         val evaluator = evaluatorFactory.createEvaluator()
-        evaluator.eval(0, iter)
+        evaluator.eval(index, rowIterator)
       }
     }
   }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowGroupLimitExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowGroupLimitExec.scala
index 98969f60c2b..e975f3b219a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowGroupLimitExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowGroupLimitExec.scala
@@ -72,8 +72,6 @@ case class WindowGroupLimitExec(
     "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output 
rows"))
 
   protected override def doExecute(): RDD[InternalRow] = {
-    val numOutputRows = longMetric("numOutputRows")
-
     val evaluatorFactory =
       new WindowGroupLimitEvaluatorFactory(
         partitionSpec,
@@ -81,14 +79,14 @@ case class WindowGroupLimitExec(
         rankLikeFunction,
         limit,
         child.output,
-        numOutputRows)
+        longMetric("numOutputRows"))
 
     if (conf.usePartitionEvaluator) {
       child.execute().mapPartitionsWithEvaluator(evaluatorFactory)
     } else {
-      child.execute().mapPartitionsInternal { iter =>
+      child.execute().mapPartitionsWithIndexInternal { (index, rowIterator) =>
         val evaluator = evaluatorFactory.createEvaluator()
-        evaluator.eval(0, iter)
+        evaluator.eval(index, rowIterator)
       }
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to