[
https://issues.apache.org/jira/browse/SPARK-56509?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Wenchen Fan reassigned SPARK-56509:
-----------------------------------
Assignee: Juliusz Sompolski
> SparkSQL Last Attempt Metrics
> -----------------------------
>
> Key: SPARK-56509
> URL: https://issues.apache.org/jira/browse/SPARK-56509
> Project: Spark
> Issue Type: New Feature
> Components: SQL
> Affects Versions: 4.2.0
> Reporter: Juliusz Sompolski
> Assignee: Juliusz Sompolski
> Priority: Major
> Labels: pull-request-available
>
> #### The problem
> Regular Spark accumulators sum up values from all task attempts, including
> retried ones. When a stage retry occurs — due to executor loss, fetch
> failures, or AQE replanning — the accumulator value includes contributions
> from both the original execution and the retry. This makes it impossible to
> determine the "true" metric value of the final successful execution. For
> example, if a stage processes 1000 rows and is retried once, `numOutputRows`
> reports 2000 instead of 1000.
> This is a fundamental limitation when accumulators are used for
> post-execution correctness checking (e.g. verifying that the number of rows
> written matches expectations), or when accurate execution statistics need to
> be reported.
> #### The solution
> `LastAttemptAccumulator` is a trait that can be mixed into any
> `AccumulatorV2` subclass. It hooks into `DAGScheduler.updateAccumulators` to
> receive per-task stage and attempt metadata alongside the regular accumulator
> merge. It tracks per-RDD, per-partition partial values tagged with `(stageId,
> stageAttemptId, taskAttemptNumber)`, and when queried, aggregates only the
> values from the latest attempt of each partition.
> #### API
> The core trait `LastAttemptAccumulator[IN, OUT, PARTIAL]` provides several
> query methods to retrieve the last attempt value, depending on what scope is
> of interest:
> - **`lastAttemptValueForRDDId(rddId)`** /
> **`lastAttemptValueForRDDIds(rddIds)`** — Value from specific RDD(s). Useful
> when the caller knows which RDD(s) the metric was used in.
> - **`lastAttemptValueForAllRDDs()`** — Value aggregated from all RDDs that
> contributed to this accumulator. Useful when only one execution used the
> accumulator.
> - **`lastAttemptValueForHighestRDDId()`** — Value from the RDD with the
> highest ID, which corresponds to the most recent execution. A simple
> heuristic that works well for single-use metrics in SQL plans.
> - **`lastAttemptValueForRDDScopes(rddScopeIds)`** — Value from RDDs matching
> specific `RDDOperationScope` IDs, enabling tracking by SparkPlan node.
> All query methods return `Option[OUT]` — `None` means the last attempt value
> cannot be determined (e.g. the accumulator was updated both from tasks and
> directly on the driver, or an internal consistency check failed).
> The SQL extension `SQLLastAttemptAccumulator` adds Dataset-aware tracking:
> - **`lastAttemptValueForDataset(ds)`** /
> **`lastAttemptValueForQueryExecution(qe)`** — Value from the execution of a
> specific Dataset or QueryExecution. This works by extracting
> `RDDOperationScope` IDs from the physical plan to identify which RDDs belong
> to which SparkPlan nodes, and is aware of exchanges, subqueries, broadcast
> joins, and WholeStageCodegen fallbacks.
> A ready-to-use concrete implementation `SQLLastAttemptMetric` (a `SQLMetric`
> with `SQLLastAttemptAccumulator` mixed in) can be created via
> `SQLLastAttemptMetrics.createMetric(sparkContext, name)`.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]