Kent Yao created SPARK-56627:
--------------------------------
Summary: DecimalAggregates: peel scale-preserving widening Cast
for Sum to enable fast-path rewrite
Key: SPARK-56627
URL: https://issues.apache.org/jira/browse/SPARK-56627
Project: Spark
Issue Type: Improvement
Components: SQL
Affects Versions: 4.1.0
Reporter: Kent Yao
## Background
The Catalyst optimizer rule `DecimalAggregates` (in
`org.apache.spark.sql.catalyst.optimizer`) rewrites `Sum(d : DECIMAL(p, s))`
with `p + 10 <= MAX_LONG_DIGITS (=18)` into `MakeDecimal(Sum(UnscaledValue(d)),
p + 10, s)`, replacing the generic `CheckOverflow` decimal path with a
Long-arithmetic fast-path that materially reduces per-row cost on aggregate and
window paths.
This rewrite is bypassed today when the input is wrapped in a *scale-preserving
widening* Cast, e.g. `Sum(Cast(d : DECIMAL(p, s) AS DECIMAL(p', s)))` with `p'
> p` and matching scale. The Cast is opaque to the rule's pattern, so the
optimizer keeps the slower `CheckOverflow` path even though the cast is
provably value-preserving and the inner `Sum(d)` would otherwise satisfy the
precondition `p + 10 <= 18`.
This pattern shows up naturally in user queries that pre-cast a narrow decimal
column to a wider decimal before aggregation, and in plans where another rule
has inserted the widening Cast (e.g. `PromoteStrings`, type-coercion lifts).
## Proposal
Add a peel step inside `DecimalAggregates` that:
1. Recognises `Sum(Cast(child, DECIMAL(p', s')))` where the child has type
`DECIMAL(p, s)`, `p' > p`, `s' == s` (scale preserved), and the inner
`Sum(child)` would satisfy the existing `p + 10 <= MAX_LONG_DIGITS`
precondition.
2. Peels the Cast and lets the existing fast-path rewrite fire on the inner
`Sum(child)`.
3. Is gated by a new internal SQLConf, default `false`, so existing plans are
byte-identical with the conf off.
Scope:
* Phase 1: `Sum` only. AVG and other aggregates are out of scope (tracked
separately if needed).
* Aggregate arm and Window arm of `Sum` are both covered by a single peel point
in the rule.
* Rule plumbing only — no public API change, no SQL grammar change, no metrics
change.
## SQLConf
* Key: `spark.sql.optimizer.decimalAggregates.peelWidenedCastForSum.enabled`
* Type: `boolean`
* Default: `false`
* Visibility: `.internal()`
* Docstring: gated peel step inside `DecimalAggregates`; rollback knob.
## Behaviour change
* With SQLConf default (`false`): byte-identical to today on every plan. No row
count, no result, no plan shape difference.
* With SQLConf `true`: only the qualifying `Sum(Cast(...))` shape changes —
gains the existing fast-path. Numerical equivalence covered by property-based
tests in `DataFrameAggregateSuite` (oracle-1/2/4 fixed witnesses).
## Verification
* Plan-shape property-based tests (catalyst): `DecimalAggregatesSuite` —
oracle-1 (peel fires when contract holds) + oracle-2 (peel does not fire
outside contract) + idempotence on `plan.canonicalized`.
* Numerical-equivalence property-based tests (sql-core):
`DataFrameAggregateSuite` — random witness sweep + fixed `(p=8, p'=30, s=2)`
witness.
* Benchmark: new `DecimalAggregatesBenchmark` under
`sql/core/src/test/scala/.../execution/benchmark/`. Local JDK 17 smoke (AMD
EPYC 7763, N=1M, iters=2): peel-on per-row ns strictly less than peel-off on
all 6 cases (4 Aggregate + 2 Window) at both
`spark.sql.decimalOperations.allowPrecisionLoss=true` and `=false`. A1 (p=7,
s=2, p'=17) shows ~1.2-1.3x. Multi-JDK GHA results will be attached on PR open.
## Public API change
None.
## SPIP
Not required. Internal optimizer rule, gated by an internal SQLConf default
`false`. No public API, no semantic change, no grammar change.
## Rollback
Disable the SQLConf.
## Affects Version
4.1.0 (master).
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]