[ 
https://issues.apache.org/jira/browse/SPARK-56546?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kent Yao reassigned SPARK-56546:
--------------------------------

    Assignee: Kent Yao

> [SQL] Optimize sliding window aggregation with segment tree
> -----------------------------------------------------------
>
>                 Key: SPARK-56546
>                 URL: https://issues.apache.org/jira/browse/SPARK-56546
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 4.2.0
>            Reporter: Kent Yao
>            Assignee: Kent Yao
>            Priority: Major
>
> h3. Background
> Spark's {{SlidingWindowFunctionFrame}} (sql/core, 
> {{WindowFunctionFrame.scala}}) recomputes the aggregate over the entire frame 
> for every output row in O(n*W) time, where n is the partition size and W is 
> the frame width. This is quadratic in W for sliding-window workloads (e.g., 
> rolling MAX, AVG, STDDEV over sensor time-series or financial tick data) and 
> dominates wall-clock on windows wider than a few hundred rows.
> Prior art: Leis et al., "Efficient Processing of Window Functions in 
> Analytical SQL Queries" (VLDB 2015) -- a segment tree per partition gives 
> O(n*log(W)) for any mergeable (associative) aggregate, including 
> non-invertible ones (MIN, MAX, generic {{DeclarativeAggregate}} with 
> {{mergeExpressions}}). DuckDB and Umbra use the same technique.
> h3. Proposal
> Introduce a segment-tree-backed {{WindowFunctionFrame}} path for bounded 
> sliding ROWS and RANGE frames over {{DeclarativeAggregate}} functions 
> (non-invertible and invertible alike). The new path is gated by a default-off 
> SQLConf so it is opt-in and fully reversible.
> Key elements:
> * New {{WindowSegmentTree}} data structure (block-chunked; leaves = cached 
> {{UnsafeRow}} blocks; internal nodes = pre-merged aggregate buffers).
> * Integration via {{SegmentTreeWindowFunctionFrame}}, selected by 
> {{WindowEvaluatorFactoryBase}} when the feature flag is on and the frame is 
> sliding / mergeable.
> * Memory integration with {{TaskMemoryManager}} (block-chunked LRU cache, 
> spill via {{ExternalAppendOnlyUnsafeRowArray}}), so it does not regress 
> low-memory operators.
> * Observability: two new SQLMetrics ({{numSegmentTreeFrames}}, 
> {{numSegmentTreeFallbackFrames}}) and EXPLAIN visibility.
> * Fallback: any unsupported case (ImperativeAggregate, UDAF via ScalaUDAF, 
> RANGE with multi-column ORDER BY, etc.) routes to the existing 
> {{SlidingWindowFunctionFrame}} with no behavioral change.
> h3. Expected benefit
> Preliminary in-tree {{WindowBenchmark}} numbers (GHA fork run, JDK 17/21/25, 
> 2M rows unless noted):
> * MIN / MAX / SUM / AVG / COUNT: ~9-10x at W=1001
> * STDDEV_SAMP: ~18x at W=1001 (stress case)
> * W sweep at W=4001: ~31x (scales with W as expected: O(W/log W))
> * String / spill path: ~17x
> Final numbers will be refreshed from the committed 
> {{sql/core/benchmarks/WindowBenchmark-results.txt}} once the current GHA 
> matrix lands.
> h3. Scope
> Phase 1 (this umbrella's first wave of PRs):
> * Bounded ROWS and RANGE sliding frames
> * {{DeclarativeAggregate}} with {{mergeExpressions}}
> * Default-off SQLConf, fallback on any unsupported shape
> Phase 2 (separate JIRAs, not blocking):
> * FAME prefix/suffix within-block cumulative
> * Adaptive block size, fanout tuning
> * Invertible fast-path for SUM/COUNT/AVG
> h3. Safety / rollback
> * SQLConf {{spark.sql.window.segmentTree.enabled}} defaults to {{false}}. 
> Setting it back to {{false}} restores the exact pre-feature execution path.
> * All existing window tests must stay green with the flag on and off; a new 
> {{SegmentTreeWindowFunctionSuite}} covers edge cases (NULL, NaN, Decimal 
> overflow, Binary, UDAF fallback, RANGE with ties / NULL keys, small-partition 
> fallback, spill).
> h3. Related work
> * Leis et al., VLDB 2015: "Efficient Processing of Window Functions in 
> Analytical SQL Queries".
> * DuckDB segment-tree window implementation.
> * Velox PR #9049 (partial FAME attempt; documented pitfalls informed this 
> design).
> h3. Subtasks
> Subtasks will be filed as the implementation splits into PR-sized units (data 
> structure, frame integration, memory/spill, metrics, RANGE support, 
> benchmark). A split-strategy document will accompany the first subtask.
> h3. Design doc
> A public design doc summary will be linked here before the first subtask PR 
> is opened.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to