[ 
https://issues.apache.org/jira/browse/SPARK-56537?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18077581#comment-18077581
 ] 

Dhruv Patel commented on SPARK-56537:
-------------------------------------

Thanks [~kabhwan]! Did the audit -- summary below.
                                                                                
                                                                                
                                               
  The bug surface in StateOperatorProgress: allUpdatesTimeMs, allRemovalsTimeMs 
and commitTimeMs are per-batch but never reset on the no-data path. The other 
three per-batch fields (numRowsUpdated,          
  numRowsRemoved, numRowsDroppedByWatermark) are already handled. The five 
snapshot fields (numRowsTotal, memoryUsedBytes, numShufflePartitions, 
numStateStoreInstances, operatorName) should keep their
  values, as today.                                                             
                                                                                
                                               
                                                                                
                          
  customMetrics is mixed. The trait subtypes already encode most of it: 
StateStoreCustomTimingMetric and StateStoreCustomSumMetric are always 
per-batch. StateStoreCustomSizeMetric is split -- some are       
  snapshots (rocksdbPinnedBlocksMemoryUsage, rocksdbSstFileSize, 
rocksdbNumInternalColFamiliesKeys, rocksdbNumExternalColumnFamilies, 
rocksdbNumInternalColumnFamilies) and some are per-batch byte/file
  counters (rocksdbBytesWritten, rocksdbBytesRead, rocksdbCompactReadBytes, 
rocksdbCompactWrittenBytes, rocksdbFlushWrittenBytes, rocksdbBytesCopied, 
rocksdbZipFileBytesUncompressed,                         
  rocksdbIteratorBytesRead, rocksdbNumReplayChangeLogFiles).                    
                          
                                         
  Proposed plan, want to confirm a few choices:                                 
                                                                                
                                               
  
  1. Reset allUpdatesTimeMs, allRemovalsTimeMs, commitTimeMs unconditionally on 
the no-data path. Straightforward.                                              
                                               
                                                                                
                          
  2. For customMetrics, reset all Sum and Timing entries. For per-batch Size 
byte counters, two options: (a) hardcode a snapshot whitelist in the reset 
routine, or (b) extend StateStoreCustomMetric with an  
  explicit isSnapshot flag (default false) so each provider declares semantics 
at the source. (b) is a slightly bigger change but keeps the truth at the 
metric definition. Any preference?
                                                                                
                                                                                
                                               
  3. StateStoreInstanceMetric out of scope unless you want it -- most are 
already snapshot-ish with ignoreIfUnchanged=true.                               
                                                     
                                         
  4. API shape: leaning towards adding resetForNoExecution() on 
StateOperatorProgress rather than growing copy()'s argument list further (3 
args after SPARK-56464, would become 6+ with this fix). Co-locates 
  the reset logic with the data class. Open to either.                          
                          
                                                                                
                                                                                
                                               
  Master only as you noted, no backport. If you don't have strong opinions on 
(2) and (4), I'll go with (b) + resetForNoExecution(), add tests in the new 
ProgressReporterSuite, and ping you on the PR.

> Some stateful operator metrics are not reset in no batch trigger progress 
> event
> -------------------------------------------------------------------------------
>
>                 Key: SPARK-56537
>                 URL: https://issues.apache.org/jira/browse/SPARK-56537
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 4.2.0
>            Reporter: Jungtaek Lim
>            Priority: Minor
>
> [https://github.com/apache/spark/pull/55331#discussion_r3103933756]
> We reset several metrics (about num rows) on stateful operator metrics for no 
> batch trigger progress event, but still we have more metrics to be audited. 
> This ticket tracks the effort.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to