[
https://issues.apache.org/jira/browse/SPARK-56537?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18077580#comment-18077580
]
Dhruv Patel commented on SPARK-56537:
-------------------------------------
Thanks [~kabhwan]! Did the audit. Summary below.
h3. StateOperatorProgress fields
|| Field || Semantics || Reset today? ||
| numRowsTotal, memoryUsedBytes, numShufflePartitions, numStateStoreInstances
| snapshot | no, correct |
| numRowsUpdated, numRowsRemoved, numRowsDroppedByWatermark | per-batch | yes
|
| allUpdatesTimeMs, allRemovalsTimeMs, commitTimeMs | per-batch | _no, bug_ |
| customMetrics | mixed (see below) | partial |
h3. customMetrics
The metric trait already disambiguates most of it:
*********************** \{{StateStoreCustomTimingMetric}} -- per-batch
** \{{StateStoreCustomSumMetric}} -- per-batch
** \{{StateStoreCustomSizeMetric}} -- split:
** snapshot: \{{rocksdbPinnedBlocksMemoryUsage}}, \{{rocksdbSstFileSize}},
\{{rocksdbNumInternalColFamiliesKeys}}, \{{rocksdbNumExternalColumnFamilies}},
\{{rocksdbNumInternalColumnFamilies}}
** per-batch byte/file counters: \{{rocksdbBytesWritten}},
\{{rocksdbBytesRead}}, \{{rocksdbCompactReadBytes}},
\{{rocksdbCompactWrittenBytes}}, \{{rocksdbFlushWrittenBytes}},
\{{rocksdbBytesCopied}},
\{{rocksdbZipFileBytesUncompressed}}, \{{rocksdbIteratorBytesRead}},
\{{rocksdbNumReplayChangeLogFiles}}
h3. Proposed plan, please confirm
# Unconditionally reset \{{allUpdatesTimeMs}}, \{{allRemovalsTimeMs}},
\{{commitTimeMs}} on the no-data path.
# For \{{customMetrics}}: reset all \{{Sum}} and \{{Timing}} entries. For
per-batch \{{Size}} byte counters, two options -- (a) hardcode a whitelist of
snapshot Size metrics in the reset routine; (b) extend
the \{{StateStoreCustomMetric}} trait with an explicit \{{isSnapshot:
Boolean}} (default false) so each provider declares semantics at the source.
(b) is a slightly bigger change but keeps the truth at the
metric definition. Preference?
# Out of scope unless you want it: \{{StateStoreInstanceMetric}}s. Most are
snapshot-ish with \{{ignoreIfUnchanged=true}} already.
h3. API shape
Leaning towards adding \{{resetForNoExecution()}} on
\{{StateOperatorProgress}} rather than growing \{{copy(...)}}'s argument list
further (3 args today, would become 6+ with this fix). Co-locates the reset
knowledge with the data class. Open to either.
Master only as you noted, no backport. If you don't have strong opinions on
(2) and the API shape, I'll go with (b) + \{{resetForNoExecution()}}, will land
tests in the new \{{ProgressReporterSuite}}, and
ping you on the PR.
> Some stateful operator metrics are not reset in no batch trigger progress
> event
> -------------------------------------------------------------------------------
>
> Key: SPARK-56537
> URL: https://issues.apache.org/jira/browse/SPARK-56537
> Project: Spark
> Issue Type: Bug
> Components: Structured Streaming
> Affects Versions: 4.2.0
> Reporter: Jungtaek Lim
> Priority: Minor
>
> [https://github.com/apache/spark/pull/55331#discussion_r3103933756]
> We reset several metrics (about num rows) on stateful operator metrics for no
> batch trigger progress event, but still we have more metrics to be audited.
> This ticket tracks the effort.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
