[ https://issues.apache.org/jira/browse/SPARK-35896?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Jungtaek Lim resolved SPARK-35896. ---------------------------------- Fix Version/s: 3.2.0 Resolution: Fixed Issue resolved by pull request 33091 [https://github.com/apache/spark/pull/33091] > [SS] Include more granular metrics for stateful operators in > StreamingQueryProgress > ----------------------------------------------------------------------------------- > > Key: SPARK-35896 > URL: https://issues.apache.org/jira/browse/SPARK-35896 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming > Affects Versions: 3.1.2 > Reporter: Venki Korukanti > Assignee: Venki Korukanti > Priority: Major > Fix For: 3.2.0 > > > Currently the streaming progress is missing a few important stateful operator > metrics in {{StateOperatorProgress}}. Each stateful operator consists of > multiple steps. Ex: {{flatMapGroupsWithState}} has two major steps: 1) > processing the input and 2) timeout processing to remove entries from the > state which have expired. The main motivation is to track down the time it > took for each individual step (such as timeout processing, watermark > processing etc) and how much data is processed to pinpoint the bottlenecks > and compare for reasoning why some microbatches are slow compared to others > in the same job. > Below are the final metrics common to all stateful operators (the one in > _*bold-italic*_ are proposed new). These metrics are in > {{StateOperatorProgress}} which is part of {{StreamingQueryProgress}}. > * _*operatorName*_ - State operator name. Can help us identify any operator > specific slowness and state store usage patterns. Ex. "dedupe" (derived using > {{StateStoreWriter.shortName}}) > * _numRowsTotal_ - number of rows in the state store across all tasks in a > stage where the operator has executed. > * _numRowsUpdated_ - number of rows added to or update in the store > * _*allUpdatesTimeMs*_ - time taken to add new rows or update existing state > store rows across all tasks in a stage where the operator has executed. > * _*numRowsRemoved*_ - number of rows deleted from state store as part of > the state cleanup mechanism across all tasks in a stage where the operator > has executed. This number helps measure the state store deletions and impact > on checkpoint commit and other latencies. > * _*allRemovalsTimeMs*_ - time taken to remove the rows from the state store > as part of state (also includes the iterating through the entire state store > to find which rows to delete) across all tasks in a stage where the operator > has executed. If we see jobs spending significant time here, it may justify a > better layout in the state store to read only the required rows than the > entire state store that is read currently. > * _*commitTimeMs*_ - time taken to commit the state store changes to > external storage for checkpointing. This is cumulative across all tasks in a > stage where this operator has executed. > * _*numShufflePartitions*_ - number of shuffle partitions this state > operator is part of. Currently the metrics like times are aggregated across > all tasks in a stage where the operator has executed. Having the number > shuffle partitions (corresponds to number of tasks) helps us find the average > task contribution to the metric. > * _*numStateStores*_ - number of state stores in the operator across all > tasks in the stage. Some stateful operators have more than one state store > (eg. stream-stream join). Tracking this number helps us find correlations > between state stores instances and microbatch latency. -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org