[ 
https://issues.apache.org/jira/browse/SPARK-56509?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenchen Fan resolved SPARK-56509.
---------------------------------
    Fix Version/s: 4.2.0
       Resolution: Fixed

Issue resolved by pull request 55371
[https://github.com/apache/spark/pull/55371]

> SparkSQL Last Attempt Metrics
> -----------------------------
>
>                 Key: SPARK-56509
>                 URL: https://issues.apache.org/jira/browse/SPARK-56509
>             Project: Spark
>          Issue Type: New Feature
>          Components: SQL
>    Affects Versions: 4.2.0
>            Reporter: Juliusz Sompolski
>            Assignee: Juliusz Sompolski
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 4.2.0
>
>
> #### The problem
> Regular Spark accumulators sum up values from all task attempts, including 
> retried ones. When a stage retry occurs — due to executor loss, fetch 
> failures, or AQE replanning — the accumulator value includes contributions 
> from both the original execution and the retry. This makes it impossible to 
> determine the "true" metric value of the final successful execution. For 
> example, if a stage processes 1000 rows and is retried once, `numOutputRows` 
> reports 2000 instead of 1000.
> This is a fundamental limitation when accumulators are used for 
> post-execution correctness checking (e.g. verifying that the number of rows 
> written matches expectations), or when accurate execution statistics need to 
> be reported.
> #### The solution
> `LastAttemptAccumulator` is a trait that can be mixed into any 
> `AccumulatorV2` subclass. It hooks into `DAGScheduler.updateAccumulators` to 
> receive per-task stage and attempt metadata alongside the regular accumulator 
> merge. It tracks per-RDD, per-partition partial values tagged with `(stageId, 
> stageAttemptId, taskAttemptNumber)`, and when queried, aggregates only the 
> values from the latest attempt of each partition.
> #### API
> The core trait `LastAttemptAccumulator[IN, OUT, PARTIAL]` provides several 
> query methods to retrieve the last attempt value, depending on what scope is 
> of interest:
> - **`lastAttemptValueForRDDId(rddId)`** / 
> **`lastAttemptValueForRDDIds(rddIds)`** — Value from specific RDD(s). Useful 
> when the caller knows which RDD(s) the metric was used in.
> - **`lastAttemptValueForAllRDDs()`** — Value aggregated from all RDDs that 
> contributed to this accumulator. Useful when only one execution used the 
> accumulator.
> - **`lastAttemptValueForHighestRDDId()`** — Value from the RDD with the 
> highest ID, which corresponds to the most recent execution. A simple 
> heuristic that works well for single-use metrics in SQL plans.
> - **`lastAttemptValueForRDDScopes(rddScopeIds)`** — Value from RDDs matching 
> specific `RDDOperationScope` IDs, enabling tracking by SparkPlan node.
> All query methods return `Option[OUT]` — `None` means the last attempt value 
> cannot be determined (e.g. the accumulator was updated both from tasks and 
> directly on the driver, or an internal consistency check failed).
> The SQL extension `SQLLastAttemptAccumulator` adds Dataset-aware tracking:
> - **`lastAttemptValueForDataset(ds)`** / 
> **`lastAttemptValueForQueryExecution(qe)`** — Value from the execution of a 
> specific Dataset or QueryExecution. This works by extracting 
> `RDDOperationScope` IDs from the physical plan to identify which RDDs belong 
> to which SparkPlan nodes, and is aware of exchanges, subqueries, broadcast 
> joins, and WholeStageCodegen fallbacks.
> A ready-to-use concrete implementation `SQLLastAttemptMetric` (a `SQLMetric` 
> with `SQLLastAttemptAccumulator` mixed in) can be created via 
> `SQLLastAttemptMetrics.createMetric(sparkContext, name)`.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to