This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new c6788e5c2fc [SPARK-40711][SQL] Add spill size metrics for window c6788e5c2fc is described below commit c6788e5c2fc3ea6e65f9d49f98dec1cd5f2b820d Author: ulysses-you <ulyssesyo...@gmail.com> AuthorDate: Mon Jan 9 12:44:23 2023 +0800 [SPARK-40711][SQL] Add spill size metrics for window ### What changes were proposed in this pull request? Window may spill if one partition size is large that can not hold in memory. This pr makes window support report spill size metrics. ### Why are the changes needed? Help user get window spill information, to track how much size would spill. ### Does this PR introduce _any_ user-facing change? yes, a new metrics. people can see it in UI ### How was this patch tested? add test for window and manual test for WindowInPandasExec: <img width="396" alt="image" src="https://user-images.githubusercontent.com/12025282/194706054-91c75f5f-e513-40fb-a148-6493d97f8c51.png"> Closes #38163 from ulysses-you/window-metrics. Authored-by: ulysses-you <ulyssesyo...@gmail.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../spark/sql/execution/python/WindowInPandasExec.scala | 6 ++++++ .../apache/spark/sql/execution/window/WindowExec.scala | 6 ++++++ .../spark/sql/execution/metric/SQLMetricsSuite.scala | 16 ++++++++++++++++ 3 files changed, 28 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala index dcaffed89cc..5e903aa991d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala @@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.execution.{ExternalAppendOnlyUnsafeRowArray, SparkPlan} +import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} import org.apache.spark.sql.execution.window._ import org.apache.spark.sql.types._ import org.apache.spark.sql.util.ArrowUtils @@ -85,6 +86,9 @@ case class WindowInPandasExec( orderSpec: Seq[SortOrder], child: SparkPlan) extends WindowExecBase with PythonSQLMetrics { + override lazy val metrics: Map[String, SQLMetric] = pythonMetrics ++ Map( + "spillSize" -> SQLMetrics.createSizeMetric(sparkContext, "spill size") + ) /** * Helper functions and data structures for window bounds @@ -245,6 +249,7 @@ case class WindowInPandasExec( val allInputs = windowBoundsInput ++ dataInputs val allInputTypes = allInputs.map(_.dataType) + val spillSize = longMetric("spillSize") // Start processing. child.execute().mapPartitions { iter => @@ -337,6 +342,7 @@ case class WindowInPandasExec( if (!found) { // clear final partition buffer.clear() + spillSize += buffer.spillSize } found } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala index dc85585b13d..dda5da6c9e9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala @@ -21,6 +21,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.execution.{ExternalAppendOnlyUnsafeRowArray, SparkPlan} +import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} /** * This class calculates and outputs (windowed) aggregates over the rows in a single (sorted) @@ -89,6 +90,9 @@ case class WindowExec( orderSpec: Seq[SortOrder], child: SparkPlan) extends WindowExecBase { + override lazy val metrics: Map[String, SQLMetric] = Map( + "spillSize" -> SQLMetrics.createSizeMetric(sparkContext, "spill size") + ) protected override def doExecute(): RDD[InternalRow] = { // Unwrap the window expressions and window frame factories from the map. @@ -96,6 +100,7 @@ case class WindowExec( val factories = windowFrameExpressionFactoryPairs.map(_._2).toArray val inMemoryThreshold = conf.windowExecBufferInMemoryThreshold val spillThreshold = conf.windowExecBufferSpillThreshold + val spillSize = longMetric("spillSize") // Start processing. child.execute().mapPartitions { stream => @@ -163,6 +168,7 @@ case class WindowExec( if (!found) { // clear final partition buffer.clear() + spillSize += buffer.spillSize } found } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala index 76b5364164e..26e61c6b58d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala @@ -37,6 +37,7 @@ import org.apache.spark.sql.execution.command.DataWritingCommandExec import org.apache.spark.sql.execution.datasources.{BasicWriteJobStatsTracker, InsertIntoHadoopFsRelationCommand, SQLHadoopMapReduceCommitProtocol, V1WriteCommand} import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ShuffleExchangeExec} import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, ShuffledHashJoinExec} +import org.apache.spark.sql.expressions.Window import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession @@ -880,6 +881,21 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils } } } + + test("SPARK-40711: Add spill size metrics for window") { + val data = Seq((1, "a"), (2, "b")).toDF("c1", "c2") + val w = Window.partitionBy("c1").orderBy("c2") + val df = data.select(rank().over(w)) + // Project + // Window + // ... + testSparkPlanMetricsWithPredicates(df, 1, Map( + 1L -> (("Window", Map( + "spill size" -> { + _.toString.matches(sizeMetricPattern) + })))) + ) + } } case class CustomFileCommitProtocol( --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org