This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new d9d79a54a3cd [SPARK-48102][SS] Track duration for acquiring source/sink metrics while reporting streaming query progress d9d79a54a3cd is described below commit d9d79a54a3cd487380039c88ebe9fa708e0dcf23 Author: Anish Shrigondekar <anish.shrigonde...@databricks.com> AuthorDate: Fri May 3 11:30:58 2024 +0900 [SPARK-48102][SS] Track duration for acquiring source/sink metrics while reporting streaming query progress ### What changes were proposed in this pull request? Track duration for acquiring source/sink metrics while reporting streaming query progress ### Why are the changes needed? Change needed to help us understand how long the source/sink progress metrics calculation is taking. Also need to understand distribution if multiple sources are used Sample log: ``` 17:26:14.769 INFO org.apache.spark.sql.execution.streaming.MicroBatchExecutionContext: Extracting source progress metrics for source=MemoryStream[value#636] took duration_ms=0 17:26:14.769 INFO org.apache.spark.sql.execution.streaming.MicroBatchExecutionContext: Extracting sink progress metrics for sink=MemorySink took duration_ms=0 ``` Existing test: ``` [info] Run completed in 9 seconds, 995 milliseconds. [info] Total number of tests run: 11 [info] Suites: completed 1, aborted 0 [info] Tests: succeeded 11, failed 0, canceled 0, ignored 1, pending 0 [info] All tests passed. ``` ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing unit tests ### Was this patch authored or co-authored using generative AI tooling? No Closes #46350 from anishshri-db/task/SPARK-48102. Authored-by: Anish Shrigondekar <anish.shrigonde...@databricks.com> Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> --- .../sql/execution/streaming/ProgressReporter.scala | 54 +++++++++++++--------- 1 file changed, 32 insertions(+), 22 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala index 6ef6f0eb7118..3842ed574355 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala @@ -37,7 +37,7 @@ import org.apache.spark.sql.execution.QueryExecution import org.apache.spark.sql.execution.datasources.v2.{MicroBatchScanExec, StreamingDataSourceV2ScanRelation, StreamWriterCommitProgress} import org.apache.spark.sql.streaming._ import org.apache.spark.sql.streaming.StreamingQueryListener.{QueryIdleEvent, QueryProgressEvent} -import org.apache.spark.util.Clock +import org.apache.spark.util.{Clock, Utils} /** * Responsible for continually reporting statistics about the amount of data processed as well @@ -334,33 +334,43 @@ abstract class ProgressContext( inputTimeSec: Double, processingTimeSec: Double): Seq[SourceProgress] = { sources.distinct.map { source => - val numRecords = execStats.flatMap(_.inputRows.get(source)).getOrElse(0L) - val sourceMetrics = source match { - case withMetrics: ReportsSourceMetrics => - withMetrics.metrics(Optional.ofNullable(latestStreamProgress.get(source).orNull)) - case _ => Map[String, String]().asJava + val (result, duration) = Utils.timeTakenMs { + val numRecords = execStats.flatMap(_.inputRows.get(source)).getOrElse(0L) + val sourceMetrics = source match { + case withMetrics: ReportsSourceMetrics => + withMetrics.metrics(Optional.ofNullable(latestStreamProgress.get(source).orNull)) + case _ => Map[String, String]().asJava + } + new SourceProgress( + description = source.toString, + startOffset = currentTriggerStartOffsets.get(source).orNull, + endOffset = currentTriggerEndOffsets.get(source).orNull, + latestOffset = currentTriggerLatestOffsets.get(source).orNull, + numInputRows = numRecords, + inputRowsPerSecond = numRecords / inputTimeSec, + processedRowsPerSecond = numRecords / processingTimeSec, + metrics = sourceMetrics + ) } - new SourceProgress( - description = source.toString, - startOffset = currentTriggerStartOffsets.get(source).orNull, - endOffset = currentTriggerEndOffsets.get(source).orNull, - latestOffset = currentTriggerLatestOffsets.get(source).orNull, - numInputRows = numRecords, - inputRowsPerSecond = numRecords / inputTimeSec, - processedRowsPerSecond = numRecords / processingTimeSec, - metrics = sourceMetrics - ) + logInfo(s"Extracting source progress metrics for source=${source.toString} took " + + s"duration_ms=$duration") + result } } private def extractSinkProgress(execStats: Option[ExecutionStats]): SinkProgress = { - val sinkOutput = execStats.flatMap(_.outputRows) - val sinkMetrics = sink match { - case withMetrics: ReportsSinkMetrics => withMetrics.metrics() - case _ => Map[String, String]().asJava - } + val (result, duration) = Utils.timeTakenMs { + val sinkOutput = execStats.flatMap(_.outputRows) + val sinkMetrics = sink match { + case withMetrics: ReportsSinkMetrics => withMetrics.metrics() + case _ => Map[String, String]().asJava + } - SinkProgress(sink.toString, sinkOutput, sinkMetrics) + SinkProgress(sink.toString, sinkOutput, sinkMetrics) + } + logInfo(s"Extracting sink progress metrics for sink=${sink.toString} took " + + s"duration_ms=$duration") + result } /** --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org