DHRUV6029 opened a new pull request, #55699: URL: https://github.com/apache/spark/pull/55699
### What changes were proposed in this pull request? This PR is a follow-up to [SPARK-56464](https://issues.apache.org/jira/browse/SPARK-56464) (commit `930c3039871`), which left a `TODO(SPARK-56537)` in `ProgressReporter#resetExecStatsForNoExecution` to track the remaining per-batch fields on `StateOperatorProgress` that were not being reset on no-data trigger progress events. Three changes: 1. **Reset the per-batch time fields on no-data trigger progress events.** `allUpdatesTimeMs`, `allRemovalsTimeMs`, and `commitTimeMs` are now reset to 0 alongside the row-count fields (`numRowsUpdated`, `numRowsRemoved`, `numRowsDroppedByWatermark`) that were already handled by SPARK-56464. 2. **Reset per-batch entries of `customMetrics` while preserving snapshot entries.** `StateOperatorProgress.customMetrics` carries values from two metric registries (`StateStoreCustomMetric` for provider-level, `StatefulOperatorCustomMetric` for operator-level) and conflates per-batch counters/timings with snapshot reads of state-store status (current memory usage, key counts, file size). On a no-data trigger we now zero per-batch entries and preserve snapshot entries. The snapshot/per-batch distinction is encoded at the metric definition via a new `isSnapshot: Boolean` flag on `StateStoreCustomMetric` (default `false`). The six snapshot Size metrics are marked at their definitions: - RocksDB (5): `rocksdbSstFileSize`, `rocksdbPinnedBlocksMemoryUsage`, `rocksdbNumInternalColFamiliesKeys`, `rocksdbNumExternalColumnFamilies`, `rocksdbNumInternalColumnFamilies`. - HDFSBackedStateStoreProvider (1): `stateOnCurrentVersionSizeBytes`. `StateStoreCustomTimingMetric` and `StateStoreCustomSumMetric` keep using the trait default (always per-batch). Operator-level `StatefulOperatorCustomSumMetric` instances (declared by `BaseStreamingDeduplicateExec`, `StreamingSymmetricHashJoinExec`, and `TransformWithStateExecBase`) are also always per-batch. 3. **Centralize the reset semantics in a new `copyForNoExecution()` method on `StateOperatorProgress`** instead of growing `copy(...)`'s parameter list further. The method takes no parameters; it inspects the operator instance's `snapshotCustomMetricNames` (a new `private[spark]` constructor field, defaulted to `Set.empty`, populated at progress build time by `StateStoreWriter.getProgress`) to decide which `customMetrics` keys to preserve. The existing 3-arg `copy(newNumRowsUpdated, newNumRowsDroppedByWatermark, newNumRowsRemoved)` signature is unchanged; its body is updated to thread `snapshotCustomMetricNames` through so `SessionWindowStateStoreSaveExec.getProgress` round-trip preserves it. The `TODO(SPARK-56537)` comment is removed from `ProgressReporter#resetExecStatsForNoExecution`, whose body is reduced to a single delegating map: `originExecStats.stateOperators.map(_.copyForNoExecution())`. ### Why are the changes needed? Today, on a no-data ("idle") trigger progress event, `StateOperatorProgress` carries the previous batch's values for `allUpdatesTimeMs`, `allRemovalsTimeMs`, `commitTimeMs`, and most of `customMetrics`. To a user reading `query.lastProgress` / `query.recentProgress` during an idle period this looks like work was performed when none was. It is also a known source of test flakiness. The `TODO(SPARK-56537)` left by SPARK-56464 in `ProgressReporter#resetExecStatsForNoExecution` explicitly tracks this follow-up. The design was discussed on the JIRA ticket and confirmed before implementation: - Encode snapshot semantics at the metric definition (option (2b) in the audit comment), not via a hardcoded whitelist in the reset routine. - Add a new `copyForNoExecution()` method on `StateOperatorProgress` rather than growing the existing `copy(...)` argument list further (3 args after SPARK-56464 would have become 6+). ### Does this PR introduce _any_ user-facing change? No public API change. User-visible behavior change: idle-trigger progress events emitted via `StreamingQueryListener.QueryProgressEvent`, `query.lastProgress`, and `query.recentProgress` will now report `0` for all per-batch fields and per-batch `customMetrics` entries instead of carrying stale values from the previous data batch. Snapshot fields (`numRowsTotal`, `memoryUsedBytes`, `numShufflePartitions`, `numStateStoreInstances`, snapshot custom metrics) are unchanged. Same direction as the SPARK-56464 fix; this PR completes the audit. ### How was this patch tested? New and updated tests in `ProgressReporterSuite.scala`: 1. Extended the SPARK-56464 test with assertions that the three time fields (`allUpdatesTimeMs`, `allRemovalsTimeMs`, `commitTimeMs`) are reset to 0 on the idle trigger, alongside the existing row-count assertions. Test description updated from "no-data batch resets numRowsRemoved to zero" to "no-data batch resets all per-batch StateOperatorProgress fields to zero" to reflect the broader scope. 2. New test "SPARK-56537: no-data batch resets per-batch customMetrics but preserves snapshot customMetrics (RocksDB)" exercising the per-batch / snapshot split end-to-end against a real `RocksDBStateStoreProvider`. The test runs one data batch, advances the manual clock to trigger an idle progress event, then asserts that 3 per-batch RocksDB metrics (`rocksdbCommitFlushLatency`, `rocksdbPutCount`, `rocksdbTotalBytesWritten`) are reset to 0 on idle, while 5 snapshot RocksDB metrics (`rocksdbPinnedBlocksMemoryUsage`, `rocksdbNumInternalColFamiliesKeys`, `rocksdbNumExternalColumnFamilies`, `rocksdbNumInternalColumnFamilies`, `rocksdbSstFileSize`) are preserved across the idle trigger. Local verification: - `build/sbt 'sql/testOnly *ProgressReporterSuite'` -> 2/2 tests pass. - `build/sbt 'sql/testOnly *ProgressReporterSuite *StreamingQueryStatusAndProgressSuite *StreamingAggregationSuite *StreamingDeduplicationSuite *MultiStatefulOperatorsSuite'` -> 240 tests pass in 7m 16s. - `dev/mima` -> no exclusions required. ### Was this patch authored or co-authored using generative AI tooling? Generated-by: Claude Opus 4.7 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
