This is an automated email from the ASF dual-hosted git repository.

viirya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new c808077  [SPARK-37578][SQL] Update task metrics from ds v2 custom 
metrics
c808077 is described below

commit c808077df15a9268bc2e8c2de1a2714715bb9f43
Author: Liang-Chi Hsieh <vii...@gmail.com>
AuthorDate: Tue Dec 28 22:09:13 2021 -0800

    [SPARK-37578][SQL] Update task metrics from ds v2 custom metrics
    
    ### What changes were proposed in this pull request?
    
    This patch proposes to update task metrics from datasource v2 custom 
metrics.
    
    ### Why are the changes needed?
    
    We have updated task metrics such as `bytesWritten` and `recordsWritten` 
from built-in data source v2 format e.g. Parquet. For other data source v2 
formats, we can define some builtin metric names so Spark can also recognize 
them and update task metrics.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes. Some special DS v2 custom metrics are updated to task metrics.
    
    ### How was this patch tested?
    
    Added unit test.
    
    Closes #35028 from viirya/SPARK-37578.
    
    Authored-by: Liang-Chi Hsieh <vii...@gmail.com>
    Signed-off-by: Liang-Chi Hsieh <vii...@gmail.com>
---
 .../sql/connector/metric/CustomTaskMetric.java     |  4 ++
 .../spark/sql/execution/metric/CustomMetrics.scala | 17 +++++-
 .../execution/ui/SQLAppStatusListenerSuite.scala   | 71 +++++++++++++++++++++-
 3 files changed, 88 insertions(+), 4 deletions(-)

diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/metric/CustomTaskMetric.java
 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/metric/CustomTaskMetric.java
index 1b6f04d..9e4b75c 100644
--- 
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/metric/CustomTaskMetric.java
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/metric/CustomTaskMetric.java
@@ -29,6 +29,10 @@ import org.apache.spark.sql.connector.read.PartitionReader;
  * The metrics will be gathered during query execution back to the driver and 
then combined. How
  * the task metrics are combined is defined by corresponding {@link 
CustomMetric} with same metric
  * name. The final result will be shown up in the data source scan operator in 
Spark UI.
+ * <p>
+ * There are a few special metric names: "bytesWritten" and "recordsWritten". 
If the data source
+ * defines custom metrics with the same names, the metric values will also be 
updated to
+ * corresponding task metrics.
  *
  * @since 3.2.0
  */
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/CustomMetrics.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/CustomMetrics.scala
index e0138b9..2da682a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/CustomMetrics.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/CustomMetrics.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.execution.metric
 
+import org.apache.spark.TaskContext
 import org.apache.spark.sql.connector.metric.{CustomMetric, CustomTaskMetric}
 
 object CustomMetrics {
@@ -24,6 +25,8 @@ object CustomMetrics {
 
   private[spark] val NUM_ROWS_PER_UPDATE = 100
 
+  private[spark] val BUILTIN_OUTPUT_METRICS = Set("bytesWritten", 
"recordsWritten")
+
   /**
    * Given a class name, builds and returns a metric type for a V2 custom 
metric class
    * `CustomMetric`.
@@ -52,7 +55,19 @@ object CustomMetrics {
       currentMetricsValues: Seq[CustomTaskMetric],
       customMetrics: Map[String, SQLMetric]): Unit = {
     currentMetricsValues.foreach { metric =>
-      customMetrics.get(metric.name()).map(_.set(metric.value()))
+      val metricName = metric.name()
+      val metricValue = metric.value()
+      customMetrics.get(metricName).map(_.set(metricValue))
+
+      if (BUILTIN_OUTPUT_METRICS.contains(metricName)) {
+        Option(TaskContext.get()).map(_.taskMetrics().outputMetrics).foreach { 
outputMetrics =>
+          metricName match {
+            case "bytesWritten" => outputMetrics.setBytesWritten(metricValue)
+            case "recordsWritten" => 
outputMetrics.setRecordsWritten(metricValue)
+            case _ => // no-op
+          }
+        }
+      }
     }
   }
 }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
index 6123064..8eaeefc 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.ui
 
 import java.util.Properties
 
-import scala.collection.mutable.ListBuffer
+import scala.collection.mutable.{ArrayBuffer, ListBuffer}
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, Path}
@@ -900,6 +900,46 @@ class SQLAppStatusListenerSuite extends SharedSparkSession 
with JsonTestUtils
       assert(innerMetric.isDefined)
     }
   }
+
+  test("SPARK-37578: Update output metrics from Datasource v2") {
+    withTempDir { dir =>
+      val statusStore = spark.sharedState.statusStore
+      val oldCount = statusStore.executionsCount()
+
+      val bytesWritten = new ArrayBuffer[Long]()
+      val recordsWritten = new ArrayBuffer[Long]()
+
+      val bytesWrittenListener = new SparkListener() {
+        override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
+          bytesWritten += taskEnd.taskMetrics.outputMetrics.bytesWritten
+          recordsWritten += taskEnd.taskMetrics.outputMetrics.recordsWritten
+        }
+      }
+      spark.sparkContext.addSparkListener(bytesWrittenListener)
+
+      try {
+        val cls = classOf[CustomMetricsDataSource].getName
+        spark.range(0, 10, 1, 2).select('id as 'i, -'id as 
'j).write.format(cls)
+          .option("path", dir.getCanonicalPath).mode("append").save()
+
+        // Wait until the new execution is started and being tracked.
+        eventually(timeout(10.seconds), interval(10.milliseconds)) {
+          assert(statusStore.executionsCount() > oldCount)
+        }
+
+        // Wait for listener to finish computing the metrics for the execution.
+        eventually(timeout(10.seconds), interval(10.milliseconds)) {
+          assert(statusStore.executionsList().nonEmpty &&
+            statusStore.executionsList().last.metricValues != null)
+        }
+
+        assert(bytesWritten.sum == 246)
+        assert(recordsWritten.sum == 20)
+      } finally {
+        spark.sparkContext.removeSparkListener(bytesWrittenListener)
+      }
+    }
+  }
 }
 
 
@@ -993,6 +1033,22 @@ class SimpleCustomMetric extends CustomMetric {
   }
 }
 
+class BytesWrittenCustomMetric extends CustomMetric {
+  override def name(): String = "bytesWritten"
+  override def description(): String = "bytesWritten metric"
+  override def aggregateTaskMetrics(taskMetrics: Array[Long]): String = {
+    s"bytesWritten: ${taskMetrics.mkString(", ")}"
+  }
+}
+
+class RecordsWrittenCustomMetric extends CustomMetric {
+  override def name(): String = "recordsWritten"
+  override def description(): String = "recordsWritten metric"
+  override def aggregateTaskMetrics(taskMetrics: Array[Long]): String = {
+    s"recordsWritten: ${taskMetrics.mkString(", ")}"
+  }
+}
+
 // The followings are for custom metrics of V2 data source.
 object CustomMetricReaderFactory extends PartitionReaderFactory {
   override def createReader(partition: InputPartition): 
PartitionReader[InternalRow] = {
@@ -1046,7 +1102,15 @@ class CustomMetricsCSVDataWriter(fs: FileSystem, file: 
Path) extends CSVDataWrit
       override def name(): String = "inner_metric"
       override def value(): Long = 54321;
     }
-    Array(metric, innerMetric)
+    val bytesWrittenMetric = new CustomTaskMetric {
+      override def name(): String = "bytesWritten"
+      override def value(): Long = 123;
+    }
+    val recordsWrittenMetric = new CustomTaskMetric {
+      override def name(): String = "recordsWritten"
+      override def value(): Long = 10;
+    }
+    Array(metric, innerMetric, bytesWrittenMetric, recordsWrittenMetric)
   }
 }
 
@@ -1088,7 +1152,8 @@ class CustomMetricsDataSource extends 
SimpleWritableDataSource {
         }
 
         override def supportedCustomMetrics(): Array[CustomMetric] = {
-          Array(new SimpleCustomMetric, new Outer.InnerCustomMetric)
+          Array(new SimpleCustomMetric, new Outer.InnerCustomMetric,
+            new BytesWrittenCustomMetric, new RecordsWrittenCustomMetric)
         }
       }
     }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to