This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new c6788e5c2fc [SPARK-40711][SQL] Add spill size metrics for window
c6788e5c2fc is described below

commit c6788e5c2fc3ea6e65f9d49f98dec1cd5f2b820d
Author: ulysses-you <ulyssesyo...@gmail.com>
AuthorDate: Mon Jan 9 12:44:23 2023 +0800

    [SPARK-40711][SQL] Add spill size metrics for window
    
    ### What changes were proposed in this pull request?
    
    Window may spill if one partition size is large that can not hold in 
memory. This pr makes window support report spill size metrics.
    
    ### Why are the changes needed?
    
    Help user get window spill information, to track how much size would spill.
    
    ### Does this PR introduce _any_ user-facing change?
    
    yes, a new metrics. people can see it in UI
    
    ### How was this patch tested?
    
    add test for window and manual test for WindowInPandasExec:
    
    <img width="396" alt="image" 
src="https://user-images.githubusercontent.com/12025282/194706054-91c75f5f-e513-40fb-a148-6493d97f8c51.png";>
    
    Closes #38163 from ulysses-you/window-metrics.
    
    Authored-by: ulysses-you <ulyssesyo...@gmail.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../spark/sql/execution/python/WindowInPandasExec.scala  |  6 ++++++
 .../apache/spark/sql/execution/window/WindowExec.scala   |  6 ++++++
 .../spark/sql/execution/metric/SQLMetricsSuite.scala     | 16 ++++++++++++++++
 3 files changed, 28 insertions(+)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala
index dcaffed89cc..5e903aa991d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala
@@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.errors.QueryExecutionErrors
 import org.apache.spark.sql.execution.{ExternalAppendOnlyUnsafeRowArray, 
SparkPlan}
+import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
 import org.apache.spark.sql.execution.window._
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.util.ArrowUtils
@@ -85,6 +86,9 @@ case class WindowInPandasExec(
     orderSpec: Seq[SortOrder],
     child: SparkPlan)
   extends WindowExecBase with PythonSQLMetrics {
+  override lazy val metrics: Map[String, SQLMetric] = pythonMetrics ++ Map(
+    "spillSize" -> SQLMetrics.createSizeMetric(sparkContext, "spill size")
+  )
 
   /**
    * Helper functions and data structures for window bounds
@@ -245,6 +249,7 @@ case class WindowInPandasExec(
 
     val allInputs = windowBoundsInput ++ dataInputs
     val allInputTypes = allInputs.map(_.dataType)
+    val spillSize = longMetric("spillSize")
 
     // Start processing.
     child.execute().mapPartitions { iter =>
@@ -337,6 +342,7 @@ case class WindowInPandasExec(
           if (!found) {
             // clear final partition
             buffer.clear()
+            spillSize += buffer.spillSize
           }
           found
         }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala
index dc85585b13d..dda5da6c9e9 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala
@@ -21,6 +21,7 @@ import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.execution.{ExternalAppendOnlyUnsafeRowArray, 
SparkPlan}
+import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
 
 /**
  * This class calculates and outputs (windowed) aggregates over the rows in a 
single (sorted)
@@ -89,6 +90,9 @@ case class WindowExec(
     orderSpec: Seq[SortOrder],
     child: SparkPlan)
   extends WindowExecBase {
+  override lazy val metrics: Map[String, SQLMetric] = Map(
+    "spillSize" -> SQLMetrics.createSizeMetric(sparkContext, "spill size")
+  )
 
   protected override def doExecute(): RDD[InternalRow] = {
     // Unwrap the window expressions and window frame factories from the map.
@@ -96,6 +100,7 @@ case class WindowExec(
     val factories = windowFrameExpressionFactoryPairs.map(_._2).toArray
     val inMemoryThreshold = conf.windowExecBufferInMemoryThreshold
     val spillThreshold = conf.windowExecBufferSpillThreshold
+    val spillSize = longMetric("spillSize")
 
     // Start processing.
     child.execute().mapPartitions { stream =>
@@ -163,6 +168,7 @@ case class WindowExec(
           if (!found) {
             // clear final partition
             buffer.clear()
+            spillSize += buffer.spillSize
           }
           found
         }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
index 76b5364164e..26e61c6b58d 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
@@ -37,6 +37,7 @@ import 
org.apache.spark.sql.execution.command.DataWritingCommandExec
 import org.apache.spark.sql.execution.datasources.{BasicWriteJobStatsTracker, 
InsertIntoHadoopFsRelationCommand, SQLHadoopMapReduceCommitProtocol, 
V1WriteCommand}
 import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, 
ShuffleExchangeExec}
 import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, 
ShuffledHashJoinExec}
+import org.apache.spark.sql.expressions.Window
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSparkSession
@@ -880,6 +881,21 @@ class SQLMetricsSuite extends SharedSparkSession with 
SQLMetricsTestUtils
       }
     }
   }
+
+  test("SPARK-40711: Add spill size metrics for window") {
+    val data = Seq((1, "a"), (2, "b")).toDF("c1", "c2")
+    val w = Window.partitionBy("c1").orderBy("c2")
+    val df = data.select(rank().over(w))
+    // Project
+    //   Window
+    //     ...
+    testSparkPlanMetricsWithPredicates(df, 1, Map(
+      1L -> (("Window", Map(
+        "spill size" -> {
+          _.toString.matches(sizeMetricPattern)
+        }))))
+    )
+  }
 }
 
 case class CustomFileCommitProtocol(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to