[ 
https://issues.apache.org/jira/browse/SPARK-56537?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18077581#comment-18077581
 ] 

Dhruv Patel edited comment on SPARK-56537 at 5/2/26 2:42 AM:
-------------------------------------------------------------

Thanks [~kabhwan]! Did the audit -- summary below.

h3. Bug surface in \{{StateOperatorProgress}}

Three per-batch time fields are never reset on the no-data path: 
allUpdatesTimeMs, allRemovalsTimeMs, commitTimeMs.

The other three per-batch fields (numRowsUpdated, numRowsRemoved, 
numRowsDroppedByWatermark) are already reset. The five snapshot fields 
(numRowsTotal, memoryUsedBytes, numShufflePartitions, numStateStoreInstances, 
operatorName) should keep their values, as today.

h3. \{{customMetrics}} splits across two registries

The trait subtypes already encode most of it cleanly.

h4. (A) \{{StateStoreCustomMetric}} (provider-level)

* \{{StateStoreCustomTimingMetric}} -- always per-batch
* \{{StateStoreCustomSumMetric}} -- always per-batch
* \{{StateStoreCustomSizeMetric}} -- mostly per-batch byte/file counters, with 
a small snapshot subset

Snapshot Size metrics (the only ambiguous group, 6 names total):
* *RocksDB (5):* rocksdbSstFileSize, rocksdbPinnedBlocksMemoryUsage, 
rocksdbNumInternalColFamiliesKeys, rocksdbNumExternalColumnFamilies, 
rocksdbNumInternalColumnFamilies
* *HDFSBackedStateStoreProvider (1):* stateOnCurrentVersionSizeBytes

Provider tally for completeness:
* \{{RocksDBStateStoreProvider}}: 36 custom metrics (31 per-batch / 5 snapshot 
Size) + 1 instance metric (StateStoreSnapshotLastUploadInstanceMetric, 
ignoreIfUnchanged=false, max-combine -- snapshot-style).
* \{{HDFSBackedStateStoreProvider}}: 5 custom metrics (4 Sum per-batch / 1 
snapshot Size) + 1 instance metric (same 
StateStoreSnapshotLastUploadInstanceMetric).

h4. (B) \{{StatefulOperatorCustomMetric}} (operator-level)

14-15 metrics across 3 operators, all StatefulOperatorCustomSumMetric, all 
per-batch by mechanism (verified by tracing every increment site):
* \{{BaseStreamingDeduplicateExec}}: numDroppedDuplicateRows.
* \{{StreamingSymmetricHashJoinExec}}: skippedNullValueCount (only declared 
when storeConf.skipNullsForStreamStreamJoins=true).
* \{{TransformWithStateExecBase}}: 13 metrics -- initialStateProcessingTimeMs, 
numValueStateVars, numListStateVars, numMapStateVars, numDeletedStateVars, 
timerProcessingTimeMs, numRegisteredTimers, numDeletedTimers, numExpiredTimers, 
numValueStateWithTTLVars, numListStateWithTTLVars, numMapStateWithTTLVars, 
numValuesRemovedDueToTTLExpiry. All incremented inside batch processing 
(StatefulProcessorHandleImpl, TransformWithStateExec) -- none are snapshot 
reads.

{panel:title=Worth flagging on 
TransformWithState|borderStyle=dashed|borderColor=#ccc|bgColor=#f4f5f7}
The six numXxxStateVars and numXxxStateWithTTLVars metrics are mechanically 
per-batch (incremented per getValueState / getListState / getMapState call) but 
a user may interpret them as snapshots ("how many state vars does my query 
have"). Resetting them to 0 on idle triggers means the dashboard value flickers 
between batches. If you'd rather preserve them, that suggests they want 
snapshot semantics and the operator should be setting them differently -- 
separate discussion. For SPARK-56537 I'd default to "reset (since they are 
mechanically per-batch)" unless you say otherwise.
{panel}

No other stateful operators (StateStoreSaveExec, StateStoreRestoreExec, 
FlatMapGroupsWithStateExec, FlatMapGroupsInPandasWithStateExec, 
StreamingGlobalLimitExec, SessionWindowStateStoreRestoreExec, 
EventTimeWatermarkExec) override customStatefulOperatorMetrics. 
SessionWindowStateStoreSaveExec.getProgress overrides only to thread 
numRowsDroppedByWatermark from the upstream restore exec; the returned 
StateOperatorProgress still flows through resetExecStatsForNoExecution normally.

h3. Edge cases confirmed handled today

lastExecution == null first-trigger case (guarded at ProgressReporter:602), 
multi-operator queries (resetExecStatsForNoExecution already iterates per 
operator), SessionWindowStateStoreSaveExec override (no bypass), Spark UI 
(reads fields verbatim, benefits for free), protobuf serializer (pure 
passthrough, no changes needed).

h3. Proposed plan -- want to confirm

# Reset allUpdatesTimeMs, allRemovalsTimeMs, commitTimeMs unconditionally on 
the no-data path. Straightforward.
# For customMetrics, reset every entry whose underlying metric is 
StateStoreCustomTimingMetric, StateStoreCustomSumMetric, or 
StatefulOperatorCustomSumMetric (all unambiguously per-batch). For 
StateStoreCustomSizeMetric, reset every entry except the six-name snapshot 
whitelist above. Two implementations:
#* *(a)* Hardcode the six snapshot Size names in resetExecStatsForNoExecution. 
Small whitelist, low maintenance burden, easy to read.
#* *(b)* Extend StateStoreCustomMetric with isSnapshot: Boolean (default 
false). Each provider declares semantics at the source. Slightly bigger 
surface, future-proof.
#* Given the snapshot subset is only six names, (a) feels right. Preference?
# Confirm TransformWithState's numXxxStateVars / numXxxStateWithTTLVars 
semantics -- reset (treat as per-batch, my default) or preserve (treat as 
user-visible snapshot, would be inconsistent with their type)?
# StateStoreInstanceMetric out of scope -- both providers' 
StateStoreSnapshotLastUploadInstanceMetric is monotonic-max-combine and 
snapshot-style, so leaving as-is is correct.
# *API shape:* leaning toward adding resetForNoExecution() on 
StateOperatorProgress rather than growing copy()'s arg list further (3 args 
after SPARK-56464, would become 6+ with this fix). Co-locates the reset logic 
with the data class. Open to either.

Master only as you noted, no backport. If you don't have strong opinions on 
(2), (3), or (5), I'll go with (a) + reset-the-TWS-state-var-metrics + 
resetForNoExecution(), add tests in the new ProgressReporterSuite (extending 
the existing test plus one for customMetrics on RocksDB), and ping you on the 
PR.


was (Author: JIRAUSER312625):
Thanks [~kabhwan]! Did the audit -- summary below.

h3. Bug surface in \{{StateOperatorProgress}}

Three per-batch time fields are never reset on the no-data path: 
\{{allUpdatesTimeMs}}, \{{allRemovalsTimeMs}}, \{{commitTimeMs}}.

The other three per-batch fields (\{{numRowsUpdated}}, \{{numRowsRemoved}}, 
\{{numRowsDroppedByWatermark}}) are already reset. The five snapshot fields 
(\{{numRowsTotal}}, \{{memoryUsedBytes}}, \{{numShufflePartitions}}, 
\{{numStateStoreInstances}}, \{{operatorName}}) should keep their values, as 
today.

h3. \{{customMetrics}} splits across two registries

The trait subtypes already encode most of it cleanly.

h4. (A) \{{StateStoreCustomMetric}} (provider-level)

* \{{StateStoreCustomTimingMetric}} -- always per-batch
* \{{StateStoreCustomSumMetric}} -- always per-batch
* \{{StateStoreCustomSizeMetric}} -- mostly per-batch byte/file counters, with 
a small snapshot subset

Snapshot Size metrics (the only ambiguous group, 6 names total):
* *RocksDB (5):* \{{rocksdbSstFileSize}}, \{{rocksdbPinnedBlocksMemoryUsage}}, 
\{{rocksdbNumInternalColFamiliesKeys}}, \{{rocksdbNumExternalColumnFamilies}}, 
\{{rocksdbNumInternalColumnFamilies}}
* *HDFSBackedStateStoreProvider (1):* \{{stateOnCurrentVersionSizeBytes}}

Provider tally for completeness:
* \{{RocksDBStateStoreProvider}}: 36 custom metrics (31 per-batch / 5 snapshot 
Size) + 1 instance metric (\{{StateStoreSnapshotLastUploadInstanceMetric}}, 
\{{ignoreIfUnchanged=false}}, max-combine -- snapshot-style).
* \{{HDFSBackedStateStoreProvider}}: 5 custom metrics (4 Sum per-batch / 1 
snapshot Size) + 1 instance metric (same 
\{{StateStoreSnapshotLastUploadInstanceMetric}}).

h4. (B) \{{StatefulOperatorCustomMetric}} (operator-level)

14-15 metrics across 3 operators, all \{{StatefulOperatorCustomSumMetric}}, all 
per-batch by mechanism (verified by tracing every increment site):
* \{{BaseStreamingDeduplicateExec}}: \{{numDroppedDuplicateRows}}.
* \{{StreamingSymmetricHashJoinExec}}: \{{skippedNullValueCount}} (only 
declared when \{{storeConf.skipNullsForStreamStreamJoins=true}}).
* \{{TransformWithStateExecBase}}: 13 metrics -- 
\{{initialStateProcessingTimeMs}}, \{{numValueStateVars}}, 
\{{numListStateVars}}, \{{numMapStateVars}}, \{{numDeletedStateVars}}, 
\{{timerProcessingTimeMs}}, \{{numRegisteredTimers}}, \{{numDeletedTimers}}, 
\{{numExpiredTimers}}, \{{numValueStateWithTTLVars}}, 
\{{numListStateWithTTLVars}}, \{{numMapStateWithTTLVars}}, 
\{{numValuesRemovedDueToTTLExpiry}}. All incremented inside batch processing 
(\{{StatefulProcessorHandleImpl}}, \{{TransformWithStateExec}}) -- none are 
snapshot reads.

{panel:title=Worth flagging on 
TransformWithState|borderStyle=dashed|borderColor=#ccc|bgColor=#f4f5f7}
The six \{{numXxxStateVars}} and \{{numXxxStateWithTTLVars}} metrics are 
mechanically per-batch (incremented per 
\{{getValueState}}/\{{getListState}}/\{{getMapState}} call) but a user may 
interpret them as snapshots ("how many state vars does my query have"). 
Resetting them to 0 on idle triggers means the dashboard value flickers between 
batches. If you'd rather preserve them, that suggests they want snapshot 
semantics and the operator should be setting them differently -- separate 
discussion. For SPARK-56537 I'd default to "reset (since they are mechanically 
per-batch)" unless you say otherwise.
{panel}

No other stateful operators (\{{StateStoreSaveExec}}, 
\{{StateStoreRestoreExec}}, \{{FlatMapGroupsWithStateExec}}, 
\{{FlatMapGroupsInPandasWithStateExec}}, \{{StreamingGlobalLimitExec}}, 
\{{SessionWindowStateStoreRestoreExec}}, \{{EventTimeWatermarkExec}}) override 
\{{customStatefulOperatorMetrics}}. 
\{{SessionWindowStateStoreSaveExec.getProgress}} overrides only to thread 
\{{numRowsDroppedByWatermark}} from the upstream restore exec; the returned 
\{{StateOperatorProgress}} still flows through 
\{{resetExecStatsForNoExecution}} normally.

h3. Edge cases confirmed handled today

{\{lastExecution == null}} first-trigger case (guarded at 
\{{ProgressReporter:602}}), multi-operator queries 
(\{{resetExecStatsForNoExecution}} already iterates per operator), 
\{{SessionWindowStateStoreSaveExec}} override (no bypass), Spark UI (reads 
fields verbatim, benefits for free), protobuf serializer (pure passthrough, no 
changes needed).

h3. Proposed plan -- want to confirm

# Reset \{{allUpdatesTimeMs}}, \{{allRemovalsTimeMs}}, \{{commitTimeMs}} 
unconditionally on the no-data path. Straightforward.
# For \{{customMetrics}}, reset every entry whose underlying metric is 
\{{StateStoreCustomTimingMetric}}, \{{StateStoreCustomSumMetric}}, or 
\{{StatefulOperatorCustomSumMetric}} (all unambiguously per-batch). For 
\{{StateStoreCustomSizeMetric}}, reset every entry except the six-name snapshot 
whitelist above. Two implementations:
#* *(a)* Hardcode the six snapshot Size names in 
\{{resetExecStatsForNoExecution}}. Small whitelist, low maintenance burden, 
easy to read.
#* *(b)* Extend \{{StateStoreCustomMetric}} with \{{isSnapshot: Boolean}} 
(default \{{false}}). Each provider declares semantics at the source. Slightly 
bigger surface, future-proof.
#* Given the snapshot subset is only six names, (a) feels right. Preference?
# Confirm TransformWithState's \{{numXxxStateVars}}/\{{numXxxStateWithTTLVars}} 
semantics -- reset (treat as per-batch, my default) or preserve (treat as 
user-visible snapshot, would be inconsistent with their type)?
# \{{StateStoreInstanceMetric}} out of scope -- both providers' 
\{{StateStoreSnapshotLastUploadInstanceMetric}} is monotonic-max-combine and 
snapshot-style, so leaving as-is is correct.
# *API shape:* leaning toward adding \{{resetForNoExecution()}} on 
\{{StateOperatorProgress}} rather than growing \{{copy()}}'s arg list further 
(3 args after SPARK-56464, would become 6+ with this fix). Co-locates the reset 
logic with the data class. Open to either.

Master only as you noted, no backport. If you don't have strong opinions on 
(2), (3), or (5), I'll go with (a) + reset-the-TWS-state-var-metrics + 
\{{resetForNoExecution()}}, add tests in the new \{{ProgressReporterSuite}} 
(extending the existing test plus one for \{{customMetrics}} on RocksDB), and 
ping you on the PR.

> Some stateful operator metrics are not reset in no batch trigger progress 
> event
> -------------------------------------------------------------------------------
>
>                 Key: SPARK-56537
>                 URL: https://issues.apache.org/jira/browse/SPARK-56537
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 4.2.0
>            Reporter: Jungtaek Lim
>            Priority: Minor
>
> [https://github.com/apache/spark/pull/55331#discussion_r3103933756]
> We reset several metrics (about num rows) on stateful operator metrics for no 
> batch trigger progress event, but still we have more metrics to be audited. 
> This ticket tracks the effort.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to