[ 
https://issues.apache.org/jira/browse/SPARK-35896?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17369627#comment-17369627
 ] 

Apache Spark commented on SPARK-35896:
--------------------------------------

User 'vkorukanti' has created a pull request for this issue:
https://github.com/apache/spark/pull/33091

> [SS] Include more granular metrics for stateful operators in 
> StreamingQueryProgress
> -----------------------------------------------------------------------------------
>
>                 Key: SPARK-35896
>                 URL: https://issues.apache.org/jira/browse/SPARK-35896
>             Project: Spark
>          Issue Type: Improvement
>          Components: Structured Streaming
>    Affects Versions: 3.1.2
>            Reporter: Venki Korukanti
>            Priority: Major
>
> Currently the streaming progress is missing a few important stateful operator 
> metrics in {{StateOperatorProgress}}. Each stateful operator consists of 
> multiple steps. Ex: {{flatMapGroupsWithState}} has two major steps: 1) 
> processing the input and 2) timeout processing to remove entries from the 
> state which have expired. The main motivation is to track down the time it 
> took for each individual step (such as timeout processing, watermark 
> processing etc) and how much data is processed to pinpoint the bottlenecks 
> and compare for reasoning why some microbatches are slow compared to others 
> in the same job.
> Below are the final metrics common to all stateful operators (the one in 
> _*bold-italic*_ are proposed new). These metrics are in 
> {{StateOperatorProgress}} which is part of {{StreamingQueryProgress}}.
>  * _*operatorName*_ - State operator name. Can help us identify any operator 
> specific slowness and state store usage patterns. Ex. "dedupe" (derived using 
> {{StateStoreWriter.shortName}})
>  * _numRowsTotal_ - number of rows in the state store across all tasks in a 
> stage where the operator has executed.
>  * _numRowsUpdated_ - number of rows added to or update in the store
>  * _*allUpdatesTimeMs*_ - time taken to add new rows or update existing state 
> store rows across all tasks in a stage where the operator has executed.
>  * _*numRowsRemoved*_ - number of rows deleted from state store as part of 
> the state cleanup mechanism across all tasks in a stage where the operator 
> has executed. This number helps measure the state store deletions and impact 
> on checkpoint commit and other latencies.
>  * _*allRemovalsTimeMs*_ - time taken to remove the rows from the state store 
> as part of state (also includes the iterating through the entire state store 
> to find which rows to delete) across all tasks in a stage where the operator 
> has executed. If we see jobs spending significant time here, it may justify a 
> better layout in the state store to read only the required rows than the 
> entire state store that is read currently.
>  * _*commitTimeMs*_ - time taken to commit the state store changes to 
> external storage for checkpointing. This is cumulative across all tasks in a 
> stage where this operator has executed.
>  * _*numShufflePartitions*_ - number of shuffle partitions this state 
> operator is part of. Currently the metrics like times are aggregated across 
> all tasks in a stage where the operator has executed. Having the number 
> shuffle partitions (corresponds to number of tasks) helps us find the average 
> task contribution to the metric.
>  * _*numStateStores*_ - number of state stores in the operator across all 
> tasks in the stage. Some stateful operators have more than one state store 
> (eg. stream-stream join). Tracking this number helps us find correlations 
> between state stores instances and microbatch latency.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to