juliuszsompolski opened a new pull request, #55585:
URL: https://github.com/apache/spark/pull/55585

   ### What changes were proposed in this pull request?
   
   This PR optimizes the storage of per-partition tracking state in 
`LastAttemptRDDVals`, which backs `SQLLastAttemptMetric` (SLAM) introduced in 
[SPARK-56509](https://issues.apache.org/jira/browse/SPARK-56509).
   
   In the original design, every `LastAttemptRDDVals` allocated three 
`Array[Int]` of size `numPartitions` to track `(stageId, stageAttemptId, 
taskAttemptNumber)` for each partition. In a typical execution without retries, 
all of those arrays contain identical values across all partitions — for a 
200-partition RDD, that's ~2.4 KB of redundant state per RDD that uses a SLAM 
accumulator.
   
   The three int arrays are replaced by a compact representation:
   
   - **Common attempt** (3 ints): `commonStageId`, `commonStageAttemptId`, 
`commonTaskAttemptNumber`. Set on the first `update()`. Partitions whose 
attempt matches the common values carry no per-partition stage/attempt state at 
all.
   
   - **Computed bitmap** (`Array[Long]`): one bit per partition, packed into 
longs. Tracks which partitions have been computed. Replaces the previous 
`EMPTY_ID` sentinel that was stored in all three int arrays.
   
   - **Sparse overrides** (parallel `Array[Int]` plus a per-partition bitmap): 
partitions whose attempt differs from the common are recorded in parallel int 
arrays (`overridePartIds`, `overrideStageIds`, `overrideStageAttemptIds`, 
`overrideTaskAttemptNumbers`). All four arrays plus the override bitmap are 
`null` until the first override is recorded — RDDs with no retries pay zero 
allocations for override storage. The bitmap lets `partialValueAt` / 
`findOverrideIdx` skip the linear scan in O(1) for partitions without an 
override.
   
   `partialValueAt` and `isEmptyAt` preserve their existing semantics, so 
`DAGScheduler.updateAccumulators` and the SLAM merge path are unchanged. 
`toString` reconstructs the original per-partition `stageIds` / 
`stageAttemptIds` / `taskAttemptNumbers` view for debug logs, and additionally 
prints the common attempt, the computed bitmap, and the override entries so the 
internal storage state is observable.
   
   The single-writer/multi-reader concurrency contract is preserved: writes to 
the override arrays and bitmap happen before the volatile write to 
`overrideSize`, so a reader observing a non-zero size sees consistent contents 
up to that index; a reader racing the very first allocation either sees 
`overrideBitmap == null` or finds no bit set for the in-flight partition, both 
of which `isOverridden` reports correctly as "not overridden."
   
   Memory comparison for a 200-partition RDD. The `partitionPartialVals` array 
of size N (e.g. `Array[Long](200)` ≈ 1.6 KB for `SQLLastAttemptMetric`) is 
required to hold the per-partition partial values and is unchanged; the numbers 
below are the additional attempt-tracking overhead per RDD on top of that array:
   
   | State                                  | Attempt-tracking overhead |
   |----------------------------------------|---------------------------|
   | Before                                 | ~2.4 KB (three `Array[Int](200)` 
plus headers) |
   | After, no retries                      | ~50 B (3 common ints + ~25 B 
computed bitmap; override fields all `null`) |
   | After, 5-partition partial retry       | ~120 B (above + 4-entry override 
arrays partially used + small override bitmap) |
   | After, full 200-partition retry (worst)| ~3.4 KB (above + four 200-element 
override int arrays) |
   
   The full-retry case is bounded and slightly worse than the original (one 
extra `Array[Int]` plus the bitmap), but is rare in practice; the common 
no-retry case improves by ~50x.
   
   ### Why are the changes needed?
   
   Follow-up to the SLAM PR 
([SPARK-56509](https://issues.apache.org/jira/browse/SPARK-56509) / 
[#55371](https://github.com/apache/spark/pull/55371)), addressing a 
memory-efficiency concern raised in review: per-partition storage of `(stageId, 
stageAttemptId, taskAttemptNumber)` is redundant in the common case.
   
   If SLAM accumulators see broader use — for example as the basis for accurate 
Spark UI metrics under retries — many `SQLMetric` instances throughout a query 
plan would each carry a `LastAttemptRDDVals` per RDD that updated them. 
Reducing the per-RDD overhead from 3·N ints to a handful of ints plus a small 
bitmap is a meaningful win at scale.
   
   ### Does this PR introduce _any_ user-facing change?
   
   No. This is an internal storage optimization for `LastAttemptRDDVals`. The 
public `LastAttemptAccumulator` / `SQLLastAttemptMetric` APIs, the values they 
return, and their behavior under stage retries are unchanged.
   
   ### How was this patch tested?
   
   - All existing SLAM test suites pass unchanged, exercising the same code 
paths as before:
     - `SQLLastAttemptMetricUnitSuite`
     - `SQLLastAttemptMetricIntegrationSuite`
     - `SQLLastAttemptMetricIntegrationSuiteWithStageRetries`
     - `SQLLastAttemptMetricPlanShapesSuite`
     - `MetricsFailureInjectionSuite`
   
     175 tests, all green.
   
   - New unit test `SQLLastAttemptMetricUnitSuite."compact storage: common 
attempt shared, overrides only for retries"` reflects on the internal fields to 
verify the storage transitions:
     - 5 same-attempt updates → `overrideSize == 0`, common values populated.
     - Stage-attempt retry of one partition → 1 override.
     - Re-retry of that partition → still 1 override (in-place update).
     - Different partition retried → 2 overrides.
     - Per-task retry (different `taskAttemptNumber`, same stage) → 3 overrides.
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   Generated-by: Claude Code (claude-opus-4-7)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to