This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new d9d79a54a3cd [SPARK-48102][SS] Track duration for acquiring 
source/sink metrics while reporting streaming query progress
d9d79a54a3cd is described below

commit d9d79a54a3cd487380039c88ebe9fa708e0dcf23
Author: Anish Shrigondekar <anish.shrigonde...@databricks.com>
AuthorDate: Fri May 3 11:30:58 2024 +0900

    [SPARK-48102][SS] Track duration for acquiring source/sink metrics while 
reporting streaming query progress
    
    ### What changes were proposed in this pull request?
    Track duration for acquiring source/sink metrics while reporting streaming 
query progress
    
    ### Why are the changes needed?
    Change needed to help us understand how long the source/sink progress 
metrics calculation is taking. Also need to understand distribution if multiple 
sources are used
    
    Sample log:
    ```
    17:26:14.769 INFO 
org.apache.spark.sql.execution.streaming.MicroBatchExecutionContext: Extracting 
source progress metrics for source=MemoryStream[value#636] took duration_ms=0
    17:26:14.769 INFO 
org.apache.spark.sql.execution.streaming.MicroBatchExecutionContext: Extracting 
sink progress metrics for sink=MemorySink took duration_ms=0
    ```
    
    Existing test:
    ```
    [info] Run completed in 9 seconds, 995 milliseconds.
    [info] Total number of tests run: 11
    [info] Suites: completed 1, aborted 0
    [info] Tests: succeeded 11, failed 0, canceled 0, ignored 1, pending 0
    [info] All tests passed.
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Existing unit tests
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #46350 from anishshri-db/task/SPARK-48102.
    
    Authored-by: Anish Shrigondekar <anish.shrigonde...@databricks.com>
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
---
 .../sql/execution/streaming/ProgressReporter.scala | 54 +++++++++++++---------
 1 file changed, 32 insertions(+), 22 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
index 6ef6f0eb7118..3842ed574355 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
@@ -37,7 +37,7 @@ import org.apache.spark.sql.execution.QueryExecution
 import org.apache.spark.sql.execution.datasources.v2.{MicroBatchScanExec, 
StreamingDataSourceV2ScanRelation, StreamWriterCommitProgress}
 import org.apache.spark.sql.streaming._
 import org.apache.spark.sql.streaming.StreamingQueryListener.{QueryIdleEvent, 
QueryProgressEvent}
-import org.apache.spark.util.Clock
+import org.apache.spark.util.{Clock, Utils}
 
 /**
  * Responsible for continually reporting statistics about the amount of data 
processed as well
@@ -334,33 +334,43 @@ abstract class ProgressContext(
       inputTimeSec: Double,
       processingTimeSec: Double): Seq[SourceProgress] = {
     sources.distinct.map { source =>
-      val numRecords = execStats.flatMap(_.inputRows.get(source)).getOrElse(0L)
-      val sourceMetrics = source match {
-        case withMetrics: ReportsSourceMetrics =>
-          
withMetrics.metrics(Optional.ofNullable(latestStreamProgress.get(source).orNull))
-        case _ => Map[String, String]().asJava
+      val (result, duration) = Utils.timeTakenMs {
+        val numRecords = 
execStats.flatMap(_.inputRows.get(source)).getOrElse(0L)
+        val sourceMetrics = source match {
+          case withMetrics: ReportsSourceMetrics =>
+            
withMetrics.metrics(Optional.ofNullable(latestStreamProgress.get(source).orNull))
+          case _ => Map[String, String]().asJava
+        }
+        new SourceProgress(
+          description = source.toString,
+          startOffset = currentTriggerStartOffsets.get(source).orNull,
+          endOffset = currentTriggerEndOffsets.get(source).orNull,
+          latestOffset = currentTriggerLatestOffsets.get(source).orNull,
+          numInputRows = numRecords,
+          inputRowsPerSecond = numRecords / inputTimeSec,
+          processedRowsPerSecond = numRecords / processingTimeSec,
+          metrics = sourceMetrics
+        )
       }
-      new SourceProgress(
-        description = source.toString,
-        startOffset = currentTriggerStartOffsets.get(source).orNull,
-        endOffset = currentTriggerEndOffsets.get(source).orNull,
-        latestOffset = currentTriggerLatestOffsets.get(source).orNull,
-        numInputRows = numRecords,
-        inputRowsPerSecond = numRecords / inputTimeSec,
-        processedRowsPerSecond = numRecords / processingTimeSec,
-        metrics = sourceMetrics
-      )
+      logInfo(s"Extracting source progress metrics for 
source=${source.toString} took " +
+        s"duration_ms=$duration")
+      result
     }
   }
 
   private def extractSinkProgress(execStats: Option[ExecutionStats]): 
SinkProgress = {
-    val sinkOutput = execStats.flatMap(_.outputRows)
-    val sinkMetrics = sink match {
-      case withMetrics: ReportsSinkMetrics => withMetrics.metrics()
-      case _ => Map[String, String]().asJava
-    }
+    val (result, duration) = Utils.timeTakenMs {
+      val sinkOutput = execStats.flatMap(_.outputRows)
+      val sinkMetrics = sink match {
+        case withMetrics: ReportsSinkMetrics => withMetrics.metrics()
+        case _ => Map[String, String]().asJava
+      }
 
-    SinkProgress(sink.toString, sinkOutput, sinkMetrics)
+      SinkProgress(sink.toString, sinkOutput, sinkMetrics)
+    }
+    logInfo(s"Extracting sink progress metrics for sink=${sink.toString} took 
" +
+      s"duration_ms=$duration")
+    result
   }
 
   /**


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to