DHRUV6029 opened a new pull request, #55699:
URL: https://github.com/apache/spark/pull/55699

   ### What changes were proposed in this pull request?
   
   This PR is a follow-up to 
[SPARK-56464](https://issues.apache.org/jira/browse/SPARK-56464) (commit 
`930c3039871`), which left a `TODO(SPARK-56537)` in 
`ProgressReporter#resetExecStatsForNoExecution` to track the remaining 
per-batch fields on `StateOperatorProgress` that were not being reset on 
no-data trigger progress events.
   
   Three changes:
   
   1. **Reset the per-batch time fields on no-data trigger progress events.** 
`allUpdatesTimeMs`, `allRemovalsTimeMs`, and `commitTimeMs` are now reset to 0 
alongside the row-count fields (`numRowsUpdated`, `numRowsRemoved`, 
`numRowsDroppedByWatermark`) that were already handled by SPARK-56464.
   
   2. **Reset per-batch entries of `customMetrics` while preserving snapshot 
entries.** `StateOperatorProgress.customMetrics` carries values from two metric 
registries (`StateStoreCustomMetric` for provider-level, 
`StatefulOperatorCustomMetric` for operator-level) and conflates per-batch 
counters/timings with snapshot reads of state-store status (current memory 
usage, key counts, file size). On a no-data trigger we now zero per-batch 
entries and preserve snapshot entries.
   
      The snapshot/per-batch distinction is encoded at the metric definition 
via a new `isSnapshot: Boolean` flag on `StateStoreCustomMetric` (default 
`false`). The six snapshot Size metrics are marked at their definitions:
      - RocksDB (5): `rocksdbSstFileSize`, `rocksdbPinnedBlocksMemoryUsage`, 
`rocksdbNumInternalColFamiliesKeys`, `rocksdbNumExternalColumnFamilies`, 
`rocksdbNumInternalColumnFamilies`.
      - HDFSBackedStateStoreProvider (1): `stateOnCurrentVersionSizeBytes`.
   
      `StateStoreCustomTimingMetric` and `StateStoreCustomSumMetric` keep using 
the trait default (always per-batch). Operator-level 
`StatefulOperatorCustomSumMetric` instances (declared by 
`BaseStreamingDeduplicateExec`, `StreamingSymmetricHashJoinExec`, and 
`TransformWithStateExecBase`) are also always per-batch.
   
   3. **Centralize the reset semantics in a new `copyForNoExecution()` method 
on `StateOperatorProgress`** instead of growing `copy(...)`'s parameter list 
further. The method takes no parameters; it inspects the operator instance's 
`snapshotCustomMetricNames` (a new `private[spark]` constructor field, 
defaulted to `Set.empty`, populated at progress build time by 
`StateStoreWriter.getProgress`) to decide which `customMetrics` keys to 
preserve. The existing 3-arg `copy(newNumRowsUpdated, 
newNumRowsDroppedByWatermark, newNumRowsRemoved)` signature is unchanged; its 
body is updated to thread `snapshotCustomMetricNames` through so 
`SessionWindowStateStoreSaveExec.getProgress` round-trip preserves it.
   
   The `TODO(SPARK-56537)` comment is removed from 
`ProgressReporter#resetExecStatsForNoExecution`, whose body is reduced to a 
single delegating map: 
`originExecStats.stateOperators.map(_.copyForNoExecution())`.
   
   ### Why are the changes needed?
   
   Today, on a no-data ("idle") trigger progress event, `StateOperatorProgress` 
carries the previous batch's values for `allUpdatesTimeMs`, 
`allRemovalsTimeMs`, `commitTimeMs`, and most of `customMetrics`. To a user 
reading `query.lastProgress` / `query.recentProgress` during an idle period 
this looks like work was performed when none was. It is also a known source of 
test flakiness.
   
   The `TODO(SPARK-56537)` left by SPARK-56464 in 
`ProgressReporter#resetExecStatsForNoExecution` explicitly tracks this 
follow-up.
   
   The design was discussed on the JIRA ticket and confirmed before 
implementation:
   - Encode snapshot semantics at the metric definition (option (2b) in the 
audit comment), not via a hardcoded whitelist in the reset routine.
   - Add a new `copyForNoExecution()` method on `StateOperatorProgress` rather 
than growing the existing `copy(...)` argument list further (3 args after 
SPARK-56464 would have become 6+).
   
   ### Does this PR introduce _any_ user-facing change?
   
   No public API change.
   
   User-visible behavior change: idle-trigger progress events emitted via 
`StreamingQueryListener.QueryProgressEvent`, `query.lastProgress`, and 
`query.recentProgress` will now report `0` for all per-batch fields and 
per-batch `customMetrics` entries instead of carrying stale values from the 
previous data batch. Snapshot fields (`numRowsTotal`, `memoryUsedBytes`, 
`numShufflePartitions`, `numStateStoreInstances`, snapshot custom metrics) are 
unchanged. Same direction as the SPARK-56464 fix; this PR completes the audit.
   
   ### How was this patch tested?
   
   New and updated tests in `ProgressReporterSuite.scala`:
   
   1. Extended the SPARK-56464 test with assertions that the three time fields 
(`allUpdatesTimeMs`, `allRemovalsTimeMs`, `commitTimeMs`) are reset to 0 on the 
idle trigger, alongside the existing row-count assertions. Test description 
updated from "no-data batch resets numRowsRemoved to zero" to "no-data batch 
resets all per-batch StateOperatorProgress fields to zero" to reflect the 
broader scope.
   
   2. New test "SPARK-56537: no-data batch resets per-batch customMetrics but 
preserves snapshot customMetrics (RocksDB)" exercising the per-batch / snapshot 
split end-to-end against a real `RocksDBStateStoreProvider`. The test runs one 
data batch, advances the manual clock to trigger an idle progress event, then 
asserts that 3 per-batch RocksDB metrics (`rocksdbCommitFlushLatency`, 
`rocksdbPutCount`, `rocksdbTotalBytesWritten`) are reset to 0 on idle, while 5 
snapshot RocksDB metrics (`rocksdbPinnedBlocksMemoryUsage`, 
`rocksdbNumInternalColFamiliesKeys`, `rocksdbNumExternalColumnFamilies`, 
`rocksdbNumInternalColumnFamilies`, `rocksdbSstFileSize`) are preserved across 
the idle trigger.
   
   Local verification:
   - `build/sbt 'sql/testOnly *ProgressReporterSuite'` -> 2/2 tests pass.
   - `build/sbt 'sql/testOnly *ProgressReporterSuite 
*StreamingQueryStatusAndProgressSuite *StreamingAggregationSuite 
*StreamingDeduplicationSuite *MultiStatefulOperatorsSuite'` -> 240 tests pass 
in 7m 16s.
   - `dev/mima` -> no exclusions required.
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   Generated-by: Claude Opus 4.7


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to