Venki Korukanti created SPARK-35896:
---------------------------------------

             Summary: [SS] Include more granular metrics for stateful operators 
in StreamingQueryProgress
                 Key: SPARK-35896
                 URL: https://issues.apache.org/jira/browse/SPARK-35896
             Project: Spark
          Issue Type: Improvement
          Components: Structured Streaming
    Affects Versions: 3.1.2
            Reporter: Venki Korukanti


Currently the streaming progress is missing a few important stateful operator 
metrics in {{StateOperatorProgress}}. Each stateful operator consists of 
multiple steps. Ex: {{flatMapGroupsWithState}} has two major steps: 1) 
processing the input and 2) timeout processing to remove entries from the state 
which have expired. The main motivation is to track down the time it took for 
each individual step (such as timeout processing, watermark processing etc) and 
how much data is processed to pinpoint the bottlenecks and compare for 
reasoning why some microbatches are slow compared to others in the same job.

Below are the final metrics common to all stateful operators (the one in 
_*bold-italic*_ are proposed new). These metrics are in 
{{StateOperatorProgress}} which is part of {{StreamingQueryProgress}}.
 * _*operatorName*_ - State operator name. Can help us identify any operator 
specific slowness and state store usage patterns. Ex. "dedupe" (derived using 
{{StateStoreWriter.shortName}})
 * _numRowsTotal_ - number of rows in the state store across all tasks in a 
stage where the operator has executed.
 * _numRowsUpdated_ - number of rows added to or update in the store
 * _*allUpdatesTimeMs*_ - time taken to add new rows or update existing state 
store rows across all tasks in a stage where the operator has executed.
 * _*numRowsRemoved*_ - number of rows deleted from state store as part of the 
state cleanup mechanism across all tasks in a stage where the operator has 
executed. This number helps measure the state store deletions and impact on 
checkpoint commit and other latencies.
 * _*allRemovalsTimeMs*_ - time taken to remove the rows from the state store 
as part of state (also includes the iterating through the entire state store to 
find which rows to delete) across all tasks in a stage where the operator has 
executed. If we see jobs spending significant time here, it may justify a 
better layout in the state store to read only the required rows than the entire 
state store that is read currently.
 * _*commitTimeMs*_ - time taken to commit the state store changes to external 
storage for checkpointing. This is cumulative across all tasks in a stage where 
this operator has executed.
 * _*numShufflePartitions*_ - number of shuffle partitions this state operator 
is part of. Currently the metrics like times are aggregated across all tasks in 
a stage where the operator has executed. Having the number shuffle partitions 
(corresponds to number of tasks) helps us find the average task contribution to 
the metric.
 * _*numStateStores*_ - number of state stores in the operator across all tasks 
in the stage. Some stateful operators have more than one state store (eg. 
stream-stream join). Tracking this number helps us find correlations between 
state stores instances and microbatch latency.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to