[
https://issues.apache.org/jira/browse/SPARK-57220?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
ASF GitHub Bot updated SPARK-57220:
-----------------------------------
Labels: pull-request-available (was: )
> Extend block-chunked segment-tree window frame to shrinking frames (UNBOUNDED
> FOLLOWING)
> ----------------------------------------------------------------------------------------
>
> Key: SPARK-57220
> URL: https://issues.apache.org/jira/browse/SPARK-57220
> Project: Spark
> Issue Type: Improvement
> Components: SQL
> Affects Versions: 4.3.0
> Reporter: Anupam Yadav
> Priority: Major
> Labels: pull-request-available
>
> h2. Background
> SPARK-56546 introduced {{SegmentTreeWindowFunctionFrame}} for non-invertible
> *sliding* aggregates, replacing the O(N*W) full-recompute path with O(N log
> W). The same data structure can answer arbitrary {{[lower, upper)}} queries,
> including the case where the upper bound is the partition end.
> This JIRA proposes extending the existing segment-tree implementation to the
> *shrinking* frame shape, i.e. {{... ROWS/RANGE BETWEEN <lower> AND UNBOUNDED
> FOLLOWING}}.
> h2. Current behaviour
> For frames of the form {{... BETWEEN <lower> AND UNBOUNDED FOLLOWING}}, the
> dispatcher in {{WindowEvaluatorFactoryBase.scala}} (lines 282-289 on master)
> always selects {{UnboundedFollowingWindowFunctionFrame}}, which recomputes
> the suffix aggregate from scratch for every output row. The class scaladoc
> explicitly acknowledges the cost (WindowFunctionFrame.scala:636):
> {quote}
> This is a very expensive operator to use, O(n * (n - 1) / 2), because we need
> to maintain a buffer and must do full recalculation after each row.
> {quote}
> The segtree path added by SPARK-56546 already builds the segment tree over
> the full partition in {{prepare()}} and supports {{queryInto(lower, upper,
> ...)}} for any subrange, but it is only wired into the moving-frame branch of
> the dispatcher (lines 291-331). Shrinking frames bypass it entirely, even
> though the upper bound is the trivial constant {{tree.size}}.
> h2. Proposal
> Extend {{SegmentTreeWindowFunctionFrame}} to also handle the shrinking case.
> Two changes:
> # Make {{ubound}} an {{Option[BoundOrdering]}} on the constructor. {{None}}
> means shrinking (upper is the partition end); {{Some(ub)}} preserves the
> existing sliding behaviour. Add a {{fallbackFactory: () =>
> WindowFunctionFrame}} so the small-partition path can produce
> {{SlidingWindowFunctionFrame}} for sliding and
> {{UnboundedFollowingWindowFunctionFrame}} for shrinking.
> # Add a shrinking-frame branch to the dispatcher in
> {{WindowEvaluatorFactoryBase}} that consults the same {{eligibleForSegTree}}
> gate and, on success, constructs a {{SegmentTreeWindowFunctionFrame}} with
> {{ubound = None}}.
> All other infrastructure (eligibility, build, spill via
> {{TaskMemoryManager}}, the {{minPartitionRows}} fallback, the SQLMetrics for
> segtree-frames-built / fallbacks) is reused as-is.
> h2. Behaviour
> * Same opt-in conf: {{spark.sql.window.segmentTree.enabled=false}} (default
> off; users on shrinking frames stay on the legacy path).
> * Same eligibility allowlist (DeclarativeAggregate with {{mergeExpressions}},
> no FILTER, no DISTINCT).
> * Same fallback for partitions below
> {{spark.sql.window.segmentTree.minPartitionRows}}, but to
> {{UnboundedFollowingWindowFunctionFrame}} instead of
> {{SlidingWindowFunctionFrame}}.
> * No analyzer / SQL grammar / plan-shape changes.
> h2. Benchmark (prototype)
> WindowBenchmark-style measurement on EC2 c5.4xlarge (Intel Xeon 8259CL @
> 2.50GHz, OpenJDK 17.0.19+10), single-partition shrinking-frame {{SUM(v) OVER
> (ORDER BY id ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING)}}:
> || N || naive (best time) || segtree (best time) || speedup ||
> | 5K | 620 ms | 73 ms | 8.5X |
> | 10K | 2 471 ms | 110 ms | 22.5X |
> | 25K | 14 259 ms | 119 ms | 119.3X |
> | 50K | 57 022 ms | 181 ms | 314.2X |
> | 100K | -- | 269 ms | -- |
> | 200K | -- | 480 ms | -- |
> Naive at N=100K and 200K skipped (extrapolated cost ~4 min / ~16 min per iter
> respectively); segtree path stays sub-second. The naive curve is clean O(N^2)
> (5x N -> 24x time at 50K vs 10K); segtree is sub-linear (2x N at 100K -> 200K
> -> 1.8x time, i.e. logarithmic per-row growth).
> Per-aggregate at N=10K (other aggregates besides SUM):
> || Aggregate || naive || segtree || speedup ||
> | SUM | 2 471 ms | 110 ms | 22.5X |
> | MIN | 2 417 ms | 215 ms | 11.2X |
> | MAX | 2 396 ms | 228 ms | 10.5X |
> | COUNT | 2 203 ms | 80 ms | 27.4X |
> | AVG | 2 886 ms | 84 ms | 34.5X |
> h2. Workload relevance
> Shrinking frames are common in retention / cohort / revenue analytics:
> "remaining lifetime value at this row", "future churn risk", "monthly revenue
> from here forward". For partitions of 100K rows or more (a single user's
> lifetime in a transactional table), the legacy O(N^2) path is infeasible.
> h2. Test surface
> A new {{UnboundedFollowingSegmentTreeSuite}} mirrors
> {{SegmentTreeWindowFunctionSuite}} structure: 26 oracle-vs-naive equivalence
> tests over CURRENT ROW / N PRECEDING / N FOLLOWING lower bounds, ROWS / RANGE
> frame types, single-row + empty partition + small-partition fallback,
> all-NULL / mixed-NULL / NaN+Infinity,
> Int/Long/Double/Decimal/String/Date/Timestamp, multi-aggregate shared frame,
> collect_list fallback (non-DeclarativeAggregate), and DISTINCT analyzer
> rejection. All 26 pass. Existing 41 sliding tests in
> {{SegmentTreeWindowFunctionSuite}} also still pass, confirming the unified
> rewrite preserves sliding-frame semantics.
> h2. Out of scope
> * Public API / SQL surface changes.
> * Distinct or filter-clause window aggregates (analyzer-rejected today).
> * {{ImperativeAggregate}} or UDAF window functions (allowlist excludes them;
> fall back to legacy).
> * Variant data type or other shrinking-frame-specific allowlist additions.
> Follows up SPARK-56546.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]