juliuszsompolski opened a new pull request, #55585: URL: https://github.com/apache/spark/pull/55585
### What changes were proposed in this pull request? This PR optimizes the storage of per-partition tracking state in `LastAttemptRDDVals`, which backs `SQLLastAttemptMetric` (SLAM) introduced in [SPARK-56509](https://issues.apache.org/jira/browse/SPARK-56509). In the original design, every `LastAttemptRDDVals` allocated three `Array[Int]` of size `numPartitions` to track `(stageId, stageAttemptId, taskAttemptNumber)` for each partition. In a typical execution without retries, all of those arrays contain identical values across all partitions — for a 200-partition RDD, that's ~2.4 KB of redundant state per RDD that uses a SLAM accumulator. The three int arrays are replaced by a compact representation: - **Common attempt** (3 ints): `commonStageId`, `commonStageAttemptId`, `commonTaskAttemptNumber`. Set on the first `update()`. Partitions whose attempt matches the common values carry no per-partition stage/attempt state at all. - **Computed bitmap** (`Array[Long]`): one bit per partition, packed into longs. Tracks which partitions have been computed. Replaces the previous `EMPTY_ID` sentinel that was stored in all three int arrays. - **Sparse overrides** (parallel `Array[Int]` plus a per-partition bitmap): partitions whose attempt differs from the common are recorded in parallel int arrays (`overridePartIds`, `overrideStageIds`, `overrideStageAttemptIds`, `overrideTaskAttemptNumbers`). All four arrays plus the override bitmap are `null` until the first override is recorded — RDDs with no retries pay zero allocations for override storage. The bitmap lets `partialValueAt` / `findOverrideIdx` skip the linear scan in O(1) for partitions without an override. `partialValueAt` and `isEmptyAt` preserve their existing semantics, so `DAGScheduler.updateAccumulators` and the SLAM merge path are unchanged. `toString` reconstructs the original per-partition `stageIds` / `stageAttemptIds` / `taskAttemptNumbers` view for debug logs, and additionally prints the common attempt, the computed bitmap, and the override entries so the internal storage state is observable. The single-writer/multi-reader concurrency contract is preserved: writes to the override arrays and bitmap happen before the volatile write to `overrideSize`, so a reader observing a non-zero size sees consistent contents up to that index; a reader racing the very first allocation either sees `overrideBitmap == null` or finds no bit set for the in-flight partition, both of which `isOverridden` reports correctly as "not overridden." Memory comparison for a 200-partition RDD. The `partitionPartialVals` array of size N (e.g. `Array[Long](200)` ≈ 1.6 KB for `SQLLastAttemptMetric`) is required to hold the per-partition partial values and is unchanged; the numbers below are the additional attempt-tracking overhead per RDD on top of that array: | State | Attempt-tracking overhead | |----------------------------------------|---------------------------| | Before | ~2.4 KB (three `Array[Int](200)` plus headers) | | After, no retries | ~50 B (3 common ints + ~25 B computed bitmap; override fields all `null`) | | After, 5-partition partial retry | ~120 B (above + 4-entry override arrays partially used + small override bitmap) | | After, full 200-partition retry (worst)| ~3.4 KB (above + four 200-element override int arrays) | The full-retry case is bounded and slightly worse than the original (one extra `Array[Int]` plus the bitmap), but is rare in practice; the common no-retry case improves by ~50x. ### Why are the changes needed? Follow-up to the SLAM PR ([SPARK-56509](https://issues.apache.org/jira/browse/SPARK-56509) / [#55371](https://github.com/apache/spark/pull/55371)), addressing a memory-efficiency concern raised in review: per-partition storage of `(stageId, stageAttemptId, taskAttemptNumber)` is redundant in the common case. If SLAM accumulators see broader use — for example as the basis for accurate Spark UI metrics under retries — many `SQLMetric` instances throughout a query plan would each carry a `LastAttemptRDDVals` per RDD that updated them. Reducing the per-RDD overhead from 3·N ints to a handful of ints plus a small bitmap is a meaningful win at scale. ### Does this PR introduce _any_ user-facing change? No. This is an internal storage optimization for `LastAttemptRDDVals`. The public `LastAttemptAccumulator` / `SQLLastAttemptMetric` APIs, the values they return, and their behavior under stage retries are unchanged. ### How was this patch tested? - All existing SLAM test suites pass unchanged, exercising the same code paths as before: - `SQLLastAttemptMetricUnitSuite` - `SQLLastAttemptMetricIntegrationSuite` - `SQLLastAttemptMetricIntegrationSuiteWithStageRetries` - `SQLLastAttemptMetricPlanShapesSuite` - `MetricsFailureInjectionSuite` 175 tests, all green. - New unit test `SQLLastAttemptMetricUnitSuite."compact storage: common attempt shared, overrides only for retries"` reflects on the internal fields to verify the storage transitions: - 5 same-attempt updates → `overrideSize == 0`, common values populated. - Stage-attempt retry of one partition → 1 override. - Re-retry of that partition → still 1 override (in-place update). - Different partition retried → 2 overrides. - Per-task retry (different `taskAttemptNumber`, same stage) → 3 overrides. ### Was this patch authored or co-authored using generative AI tooling? Generated-by: Claude Code (claude-opus-4-7) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
