This is an automated email from the ASF dual-hosted git repository. viirya pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new c808077 [SPARK-37578][SQL] Update task metrics from ds v2 custom metrics c808077 is described below commit c808077df15a9268bc2e8c2de1a2714715bb9f43 Author: Liang-Chi Hsieh <vii...@gmail.com> AuthorDate: Tue Dec 28 22:09:13 2021 -0800 [SPARK-37578][SQL] Update task metrics from ds v2 custom metrics ### What changes were proposed in this pull request? This patch proposes to update task metrics from datasource v2 custom metrics. ### Why are the changes needed? We have updated task metrics such as `bytesWritten` and `recordsWritten` from built-in data source v2 format e.g. Parquet. For other data source v2 formats, we can define some builtin metric names so Spark can also recognize them and update task metrics. ### Does this PR introduce _any_ user-facing change? Yes. Some special DS v2 custom metrics are updated to task metrics. ### How was this patch tested? Added unit test. Closes #35028 from viirya/SPARK-37578. Authored-by: Liang-Chi Hsieh <vii...@gmail.com> Signed-off-by: Liang-Chi Hsieh <vii...@gmail.com> --- .../sql/connector/metric/CustomTaskMetric.java | 4 ++ .../spark/sql/execution/metric/CustomMetrics.scala | 17 +++++- .../execution/ui/SQLAppStatusListenerSuite.scala | 71 +++++++++++++++++++++- 3 files changed, 88 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/metric/CustomTaskMetric.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/metric/CustomTaskMetric.java index 1b6f04d..9e4b75c 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/metric/CustomTaskMetric.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/metric/CustomTaskMetric.java @@ -29,6 +29,10 @@ import org.apache.spark.sql.connector.read.PartitionReader; * The metrics will be gathered during query execution back to the driver and then combined. How * the task metrics are combined is defined by corresponding {@link CustomMetric} with same metric * name. The final result will be shown up in the data source scan operator in Spark UI. + * <p> + * There are a few special metric names: "bytesWritten" and "recordsWritten". If the data source + * defines custom metrics with the same names, the metric values will also be updated to + * corresponding task metrics. * * @since 3.2.0 */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/CustomMetrics.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/CustomMetrics.scala index e0138b9..2da682a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/CustomMetrics.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/CustomMetrics.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution.metric +import org.apache.spark.TaskContext import org.apache.spark.sql.connector.metric.{CustomMetric, CustomTaskMetric} object CustomMetrics { @@ -24,6 +25,8 @@ object CustomMetrics { private[spark] val NUM_ROWS_PER_UPDATE = 100 + private[spark] val BUILTIN_OUTPUT_METRICS = Set("bytesWritten", "recordsWritten") + /** * Given a class name, builds and returns a metric type for a V2 custom metric class * `CustomMetric`. @@ -52,7 +55,19 @@ object CustomMetrics { currentMetricsValues: Seq[CustomTaskMetric], customMetrics: Map[String, SQLMetric]): Unit = { currentMetricsValues.foreach { metric => - customMetrics.get(metric.name()).map(_.set(metric.value())) + val metricName = metric.name() + val metricValue = metric.value() + customMetrics.get(metricName).map(_.set(metricValue)) + + if (BUILTIN_OUTPUT_METRICS.contains(metricName)) { + Option(TaskContext.get()).map(_.taskMetrics().outputMetrics).foreach { outputMetrics => + metricName match { + case "bytesWritten" => outputMetrics.setBytesWritten(metricValue) + case "recordsWritten" => outputMetrics.setRecordsWritten(metricValue) + case _ => // no-op + } + } + } } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala index 6123064..8eaeefc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.ui import java.util.Properties -import scala.collection.mutable.ListBuffer +import scala.collection.mutable.{ArrayBuffer, ListBuffer} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} @@ -900,6 +900,46 @@ class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTestUtils assert(innerMetric.isDefined) } } + + test("SPARK-37578: Update output metrics from Datasource v2") { + withTempDir { dir => + val statusStore = spark.sharedState.statusStore + val oldCount = statusStore.executionsCount() + + val bytesWritten = new ArrayBuffer[Long]() + val recordsWritten = new ArrayBuffer[Long]() + + val bytesWrittenListener = new SparkListener() { + override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { + bytesWritten += taskEnd.taskMetrics.outputMetrics.bytesWritten + recordsWritten += taskEnd.taskMetrics.outputMetrics.recordsWritten + } + } + spark.sparkContext.addSparkListener(bytesWrittenListener) + + try { + val cls = classOf[CustomMetricsDataSource].getName + spark.range(0, 10, 1, 2).select('id as 'i, -'id as 'j).write.format(cls) + .option("path", dir.getCanonicalPath).mode("append").save() + + // Wait until the new execution is started and being tracked. + eventually(timeout(10.seconds), interval(10.milliseconds)) { + assert(statusStore.executionsCount() > oldCount) + } + + // Wait for listener to finish computing the metrics for the execution. + eventually(timeout(10.seconds), interval(10.milliseconds)) { + assert(statusStore.executionsList().nonEmpty && + statusStore.executionsList().last.metricValues != null) + } + + assert(bytesWritten.sum == 246) + assert(recordsWritten.sum == 20) + } finally { + spark.sparkContext.removeSparkListener(bytesWrittenListener) + } + } + } } @@ -993,6 +1033,22 @@ class SimpleCustomMetric extends CustomMetric { } } +class BytesWrittenCustomMetric extends CustomMetric { + override def name(): String = "bytesWritten" + override def description(): String = "bytesWritten metric" + override def aggregateTaskMetrics(taskMetrics: Array[Long]): String = { + s"bytesWritten: ${taskMetrics.mkString(", ")}" + } +} + +class RecordsWrittenCustomMetric extends CustomMetric { + override def name(): String = "recordsWritten" + override def description(): String = "recordsWritten metric" + override def aggregateTaskMetrics(taskMetrics: Array[Long]): String = { + s"recordsWritten: ${taskMetrics.mkString(", ")}" + } +} + // The followings are for custom metrics of V2 data source. object CustomMetricReaderFactory extends PartitionReaderFactory { override def createReader(partition: InputPartition): PartitionReader[InternalRow] = { @@ -1046,7 +1102,15 @@ class CustomMetricsCSVDataWriter(fs: FileSystem, file: Path) extends CSVDataWrit override def name(): String = "inner_metric" override def value(): Long = 54321; } - Array(metric, innerMetric) + val bytesWrittenMetric = new CustomTaskMetric { + override def name(): String = "bytesWritten" + override def value(): Long = 123; + } + val recordsWrittenMetric = new CustomTaskMetric { + override def name(): String = "recordsWritten" + override def value(): Long = 10; + } + Array(metric, innerMetric, bytesWrittenMetric, recordsWrittenMetric) } } @@ -1088,7 +1152,8 @@ class CustomMetricsDataSource extends SimpleWritableDataSource { } override def supportedCustomMetrics(): Array[CustomMetric] = { - Array(new SimpleCustomMetric, new Outer.InnerCustomMetric) + Array(new SimpleCustomMetric, new Outer.InnerCustomMetric, + new BytesWrittenCustomMetric, new RecordsWrittenCustomMetric) } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org