[
https://issues.apache.org/jira/browse/SPARK-56537?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18077581#comment-18077581
]
Dhruv Patel edited comment on SPARK-56537 at 5/2/26 2:24 AM:
-------------------------------------------------------------
Thanks [~kabhwan]! Did the audit -- summary below.
h3. Bug surface in \{{StateOperatorProgress}}
Three per-batch time fields are never reset on the no-data path:
\{{allUpdatesTimeMs}}, \{{allRemovalsTimeMs}}, \{{commitTimeMs}}.
The other three per-batch fields (\{{numRowsUpdated}}, \{{numRowsRemoved}},
\{{numRowsDroppedByWatermark}}) are already reset. The five snapshot fields
(\{{numRowsTotal}}, \{{memoryUsedBytes}}, \{{numShufflePartitions}},
\{{numStateStoreInstances}}, \{{operatorName}}) should keep their values, as
today.
h3. \{{customMetrics}} splits across two registries
The trait subtypes already encode most of it cleanly.
h4. (A) \{{StateStoreCustomMetric}} (provider-level)
* \{{StateStoreCustomTimingMetric}} -- always per-batch
* \{{StateStoreCustomSumMetric}} -- always per-batch
* \{{StateStoreCustomSizeMetric}} -- mostly per-batch byte/file counters, with
a small snapshot subset
Snapshot Size metrics (the only ambiguous group, 6 names total):
* *RocksDB (5):* \{{rocksdbSstFileSize}}, \{{rocksdbPinnedBlocksMemoryUsage}},
\{{rocksdbNumInternalColFamiliesKeys}}, \{{rocksdbNumExternalColumnFamilies}},
\{{rocksdbNumInternalColumnFamilies}}
* *HDFSBackedStateStoreProvider (1):* \{{stateOnCurrentVersionSizeBytes}}
Provider tally for completeness:
* \{{RocksDBStateStoreProvider}}: 36 custom metrics (31 per-batch / 5 snapshot
Size) + 1 instance metric (\{{StateStoreSnapshotLastUploadInstanceMetric}},
\{{ignoreIfUnchanged=false}}, max-combine -- snapshot-style).
* \{{HDFSBackedStateStoreProvider}}: 5 custom metrics (4 Sum per-batch / 1
snapshot Size) + 1 instance metric (same
\{{StateStoreSnapshotLastUploadInstanceMetric}}).
h4. (B) \{{StatefulOperatorCustomMetric}} (operator-level)
14-15 metrics across 3 operators, all \{{StatefulOperatorCustomSumMetric}}, all
per-batch by mechanism (verified by tracing every increment site):
* \{{BaseStreamingDeduplicateExec}}: \{{numDroppedDuplicateRows}}.
* \{{StreamingSymmetricHashJoinExec}}: \{{skippedNullValueCount}} (only
declared when \{{storeConf.skipNullsForStreamStreamJoins=true}}).
* \{{TransformWithStateExecBase}}: 13 metrics --
\{{initialStateProcessingTimeMs}}, \{{numValueStateVars}},
\{{numListStateVars}}, \{{numMapStateVars}}, \{{numDeletedStateVars}},
\{{timerProcessingTimeMs}}, \{{numRegisteredTimers}}, \{{numDeletedTimers}},
\{{numExpiredTimers}}, \{{numValueStateWithTTLVars}},
\{{numListStateWithTTLVars}}, \{{numMapStateWithTTLVars}},
\{{numValuesRemovedDueToTTLExpiry}}. All incremented inside batch processing
(\{{StatefulProcessorHandleImpl}}, \{{TransformWithStateExec}}) -- none are
snapshot reads.
{panel:title=Worth flagging on
TransformWithState|borderStyle=dashed|borderColor=#ccc|bgColor=#f4f5f7}
The six \{{numXxxStateVars}} and \{{numXxxStateWithTTLVars}} metrics are
mechanically per-batch (incremented per
\{{getValueState}}/\{{getListState}}/\{{getMapState}} call) but a user may
interpret them as snapshots ("how many state vars does my query have").
Resetting them to 0 on idle triggers means the dashboard value flickers between
batches. If you'd rather preserve them, that suggests they want snapshot
semantics and the operator should be setting them differently -- separate
discussion. For SPARK-56537 I'd default to "reset (since they are mechanically
per-batch)" unless you say otherwise.
{panel}
No other stateful operators (\{{StateStoreSaveExec}},
\{{StateStoreRestoreExec}}, \{{FlatMapGroupsWithStateExec}},
\{{FlatMapGroupsInPandasWithStateExec}}, \{{StreamingGlobalLimitExec}},
\{{SessionWindowStateStoreRestoreExec}}, \{{EventTimeWatermarkExec}}) override
\{{customStatefulOperatorMetrics}}.
\{{SessionWindowStateStoreSaveExec.getProgress}} overrides only to thread
\{{numRowsDroppedByWatermark}} from the upstream restore exec; the returned
\{{StateOperatorProgress}} still flows through
\{{resetExecStatsForNoExecution}} normally.
h3. Edge cases confirmed handled today
{\{lastExecution == null}} first-trigger case (guarded at
\{{ProgressReporter:602}}), multi-operator queries
(\{{resetExecStatsForNoExecution}} already iterates per operator),
\{{SessionWindowStateStoreSaveExec}} override (no bypass), Spark UI (reads
fields verbatim, benefits for free), protobuf serializer (pure passthrough, no
changes needed).
h3. Proposed plan -- want to confirm
# Reset \{{allUpdatesTimeMs}}, \{{allRemovalsTimeMs}}, \{{commitTimeMs}}
unconditionally on the no-data path. Straightforward.
# For \{{customMetrics}}, reset every entry whose underlying metric is
\{{StateStoreCustomTimingMetric}}, \{{StateStoreCustomSumMetric}}, or
\{{StatefulOperatorCustomSumMetric}} (all unambiguously per-batch). For
\{{StateStoreCustomSizeMetric}}, reset every entry except the six-name snapshot
whitelist above. Two implementations:
#* *(a)* Hardcode the six snapshot Size names in
\{{resetExecStatsForNoExecution}}. Small whitelist, low maintenance burden,
easy to read.
#* *(b)* Extend \{{StateStoreCustomMetric}} with \{{isSnapshot: Boolean}}
(default \{{false}}). Each provider declares semantics at the source. Slightly
bigger surface, future-proof.
#* Given the snapshot subset is only six names, (a) feels right. Preference?
# Confirm TransformWithState's \{{numXxxStateVars}}/\{{numXxxStateWithTTLVars}}
semantics -- reset (treat as per-batch, my default) or preserve (treat as
user-visible snapshot, would be inconsistent with their type)?
# \{{StateStoreInstanceMetric}} out of scope -- both providers'
\{{StateStoreSnapshotLastUploadInstanceMetric}} is monotonic-max-combine and
snapshot-style, so leaving as-is is correct.
# *API shape:* leaning toward adding \{{resetForNoExecution()}} on
\{{StateOperatorProgress}} rather than growing \{{copy()}}'s arg list further
(3 args after SPARK-56464, would become 6+ with this fix). Co-locates the reset
logic with the data class. Open to either.
Master only as you noted, no backport. If you don't have strong opinions on
(2), (3), or (5), I'll go with (a) + reset-the-TWS-state-var-metrics +
\{{resetForNoExecution()}}, add tests in the new \{{ProgressReporterSuite}}
(extending the existing test plus one for \{{customMetrics}} on RocksDB), and
ping you on the PR.
was (Author: JIRAUSER312625):
Thanks [~kabhwan]! Did the audit -- summary below.
The bug surface in StateOperatorProgress: allUpdatesTimeMs, allRemovalsTimeMs
and commitTimeMs are per-batch but never reset on the no-data path. The other
three per-batch fields (numRowsUpdated,
numRowsRemoved, numRowsDroppedByWatermark) are already handled. The five
snapshot fields (numRowsTotal, memoryUsedBytes, numShufflePartitions,
numStateStoreInstances, operatorName) should keep their
values, as today.
customMetrics is mixed. The trait subtypes already encode most of it:
StateStoreCustomTimingMetric and StateStoreCustomSumMetric are always
per-batch. StateStoreCustomSizeMetric is split -- some are
snapshots (rocksdbPinnedBlocksMemoryUsage, rocksdbSstFileSize,
rocksdbNumInternalColFamiliesKeys, rocksdbNumExternalColumnFamilies,
rocksdbNumInternalColumnFamilies) and some are per-batch byte/file
counters (rocksdbBytesWritten, rocksdbBytesRead, rocksdbCompactReadBytes,
rocksdbCompactWrittenBytes, rocksdbFlushWrittenBytes, rocksdbBytesCopied,
rocksdbZipFileBytesUncompressed,
rocksdbIteratorBytesRead, rocksdbNumReplayChangeLogFiles).
Proposed plan, want to confirm a few choices:
1. Reset allUpdatesTimeMs, allRemovalsTimeMs, commitTimeMs unconditionally on
the no-data path. Straightforward.
2. For customMetrics, reset all Sum and Timing entries. For per-batch Size
byte counters, two options: (a) hardcode a snapshot whitelist in the reset
routine, or (b) extend StateStoreCustomMetric with an
explicit isSnapshot flag (default false) so each provider declares semantics
at the source. (b) is a slightly bigger change but keeps the truth at the
metric definition. Any preference?
3. StateStoreInstanceMetric out of scope unless you want it -- most are
already snapshot-ish with ignoreIfUnchanged=true.
4. API shape: leaning towards adding resetForNoExecution() on
StateOperatorProgress rather than growing copy()'s argument list further (3
args after SPARK-56464, would become 6+ with this fix). Co-locates
the reset logic with the data class. Open to either.
Master only as you noted, no backport. If you don't have strong opinions on
(2) and (4), I'll go with (b) + resetForNoExecution(), add tests in the new
ProgressReporterSuite, and ping you on the PR.
> Some stateful operator metrics are not reset in no batch trigger progress
> event
> -------------------------------------------------------------------------------
>
> Key: SPARK-56537
> URL: https://issues.apache.org/jira/browse/SPARK-56537
> Project: Spark
> Issue Type: Bug
> Components: Structured Streaming
> Affects Versions: 4.2.0
> Reporter: Jungtaek Lim
> Priority: Minor
>
> [https://github.com/apache/spark/pull/55331#discussion_r3103933756]
> We reset several metrics (about num rows) on stateful operator metrics for no
> batch trigger progress event, but still we have more metrics to be audited.
> This ticket tracks the effort.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]