[ 
https://issues.apache.org/jira/browse/SPARK-57220?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Anupam Yadav updated SPARK-57220:
---------------------------------
    Description: 
h2. Background

SPARK-56546 introduced {{SegmentTreeWindowFunctionFrame}} for non-invertible 
*sliding* aggregates, replacing the O(N*W) full-recompute path with O(N log W). 
The same data structure can answer arbitrary {{[lower, upper)}} queries, 
including the case where the upper bound is the partition end.

This JIRA extends the existing segment-tree implementation to the *shrinking* 
frame shape, i.e. {{... ROWS/RANGE BETWEEN <lower> AND UNBOUNDED FOLLOWING}}.

PR: https://github.com/apache/spark/pull/56291

h2. Current behaviour

For frames of the form {{... BETWEEN <lower> AND UNBOUNDED FOLLOWING}}, the 
dispatcher in {{WindowEvaluatorFactoryBase.scala}} (lines 282-289 on master) 
always selects {{UnboundedFollowingWindowFunctionFrame}}, which recomputes the 
suffix aggregate from scratch for every output row. The class scaladoc 
explicitly acknowledges the cost (WindowFunctionFrame.scala:636):

{quote}
This is a very expensive operator to use, O(n * (n - 1) / 2), because we need 
to maintain a buffer and must do full recalculation after each row.
{quote}

The segtree path added by SPARK-56546 already builds the segment tree over the 
full partition in {{prepare()}} and supports {{queryInto(lower, upper, ...)}} 
for any subrange, but it is only wired into the moving-frame branch of the 
dispatcher (lines 291-331). Shrinking frames bypass it entirely, even though 
the upper bound is the trivial constant {{tree.size}}.

h2. Proposal

Extend {{SegmentTreeWindowFunctionFrame}} to also handle the shrinking case. 
Three changes:

# Make {{ubound}} an {{Option[BoundOrdering]}} on the constructor. {{None}} 
means shrinking (upper is the partition end); {{Some(ub)}} preserves the 
existing sliding behaviour. Add a {{fallbackFactory: () => 
WindowFunctionFrame}} so the small-partition path can produce 
{{SlidingWindowFunctionFrame}} for sliding and 
{{UnboundedFollowingWindowFunctionFrame}} for shrinking.
# Add a shrinking-frame branch to the dispatcher in 
{{WindowEvaluatorFactoryBase}} that consults the same {{eligibleForSegTree}} 
gate and, on success, constructs a {{SegmentTreeWindowFunctionFrame}} with 
{{ubound = None}}.
# Use {{cacheHint = Some(2)}} explicitly for the shrinking call site rather 
than routing through {{estimateMaxCachedBlocks}} (which would silently default 
to {{Some(8)}} since no {{IntegerLiteral}} upper-bound case matches). The 
shrinking access pattern only ever needs at most 2 cached block-levels 
regardless of partition size: middle blocks of {{[lower, n)}} are answered 
directly from the always-resident {{blockAggregates}}, never via the per-block 
LRU; the lower-edge cursor advances monotonically with the output row, so each 
partial block is needed for at most {{blockSize}} consecutive queries and then 
never revisited.

All other infrastructure (eligibility, build, spill via {{TaskMemoryManager}}, 
the {{minPartitionRows}} fallback, the SQLMetrics for segtree-frames-built / 
fallbacks) is reused as-is.

h2. Behaviour

* Same opt-in conf: {{spark.sql.window.segmentTree.enabled=false}} (default 
off; users on shrinking frames stay on the legacy path).
* Same eligibility allowlist (DeclarativeAggregate with {{mergeExpressions}}, 
no FILTER, no DISTINCT).
* Same fallback for partitions below 
{{spark.sql.window.segmentTree.minPartitionRows}}, but to 
{{UnboundedFollowingWindowFunctionFrame}} instead of 
{{SlidingWindowFunctionFrame}}.
* No analyzer / SQL grammar / plan-shape changes.

h2. Benchmark

WindowBenchmark-style measurement on Linux x86_64 (Intel Xeon Platinum 8259CL @ 
2.50GHz, OpenJDK 17.0.19+10-LTS), single-partition shrinking-frame {{SUM(v) 
OVER (ORDER BY id ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING)}}:

|| N || naive (best time) || segtree (best time) || speedup ||
| 5K   | 620 ms    | 73 ms  | 8.5X    |
| 10K  | 2 471 ms  | 110 ms | 22.5X   |
| 25K  | 14 259 ms | 119 ms | 119.3X  |
| 50K  | 57 022 ms | 181 ms | 314.2X  |
| 100K | --        | 269 ms | --      |
| 200K | --        | 480 ms | --      |

Naive at N=100K and 200K skipped (extrapolated cost ~4 min / ~16 min per iter 
respectively); segtree path stays sub-second. The naive curve is clean O(N^2) 
(5x N -> 24x time at 50K vs 10K); segtree is sub-linear (2x N at 100K -> 200K 
-> 1.8x time, i.e. logarithmic per-row growth).

Per-aggregate at N=10K (other aggregates besides SUM):

|| Aggregate || naive || segtree || speedup ||
| SUM   | 2 471 ms | 110 ms | 22.5X |
| MIN   | 2 417 ms | 215 ms | 11.2X |
| MAX   | 2 396 ms | 228 ms | 10.5X |
| COUNT | 2 203 ms | 80 ms  | 27.4X |
| AVG   | 2 886 ms | 84 ms  | 34.5X |

h2. Workload relevance

Shrinking frames are common in retention / cohort / revenue analytics: 
"remaining lifetime value at this row", "future churn risk", "monthly revenue 
from here forward". For partitions of 100K rows or more (a single user's 
lifetime in a transactional table), the legacy O(N^2) path is infeasible.

h2. Test surface

{{UnboundedFollowingSegmentTreeSuite}} mirrors 
{{SegmentTreeWindowFunctionSuite}} structure: 26 oracle-vs-naive equivalence 
tests over CURRENT ROW / N PRECEDING / N FOLLOWING lower bounds, ROWS / RANGE 
frame types, single-row + empty partition + small-partition fallback, all-NULL 
/ mixed-NULL / NaN+Infinity, Int/Long/Double/Decimal/String/Date/Timestamp, 
multi-aggregate shared frame, collect_list fallback (non-DeclarativeAggregate), 
and DISTINCT analyzer rejection. All 26 pass.

Full regression coverage (no failures):
* {{SegmentTreeWindowFunctionSuite}} (41 sliding tests)
* {{WindowSegmentTreeSuite}}, {{WindowSegmentTreePropertySuite}}, 
{{WindowSegmentTreeMemorySuite}}, {{SegmentTreeWindowMetricsSuite}}, 
{{WindowSegmentTreeAllowlistSuite}} (50 segtree-adjacent tests)
* {{DataFrameWindowFunctionsSuite}} (55 high-level window tests)

172 tests total, 0 failures. Confirms the unified rewrite preserves 
sliding-frame semantics.

h2. Out of scope

* Public API / SQL surface changes.
* Distinct or filter-clause window aggregates (analyzer-rejected today).
* {{ImperativeAggregate}} or UDAF window functions (allowlist excludes them; 
fall back to legacy).
* Variant data type or other shrinking-frame-specific allowlist additions.

Follows up SPARK-56546.

  was:
h2. Background

SPARK-56546 introduced {{SegmentTreeWindowFunctionFrame}} for non-invertible 
*sliding* aggregates, replacing the O(N*W) full-recompute path with O(N log W). 
The same data structure can answer arbitrary {{[lower, upper)}} queries, 
including the case where the upper bound is the partition end.

This JIRA extends the existing segment-tree implementation to the *shrinking* 
frame shape, i.e. {{... ROWS/RANGE BETWEEN <lower> AND UNBOUNDED FOLLOWING}}.

PR: https://github.com/apache/spark/pull/56291

h2. Current behaviour

For frames of the form {{... BETWEEN <lower> AND UNBOUNDED FOLLOWING}}, the 
dispatcher in {{WindowEvaluatorFactoryBase.scala}} (lines 282-289 on master) 
always selects {{UnboundedFollowingWindowFunctionFrame}}, which recomputes the 
suffix aggregate from scratch for every output row. The class scaladoc 
explicitly acknowledges the cost (WindowFunctionFrame.scala:636):

{quote}
This is a very expensive operator to use, O(n * (n - 1) / 2), because we need 
to maintain a buffer and must do full recalculation after each row.
{quote}

The segtree path added by SPARK-56546 already builds the segment tree over the 
full partition in {{prepare()}} and supports {{queryInto(lower, upper, ...)}} 
for any subrange, but it is only wired into the moving-frame branch of the 
dispatcher (lines 291-331). Shrinking frames bypass it entirely, even though 
the upper bound is the trivial constant {{tree.size}}.

h2. Proposal

Extend {{SegmentTreeWindowFunctionFrame}} to also handle the shrinking case. 
Three changes:

# Make {{ubound}} an {{Option[BoundOrdering]}} on the constructor. {{None}} 
means shrinking (upper is the partition end); {{Some(ub)}} preserves the 
existing sliding behaviour. Add a {{fallbackFactory: () => 
WindowFunctionFrame}} so the small-partition path can produce 
{{SlidingWindowFunctionFrame}} for sliding and 
{{UnboundedFollowingWindowFunctionFrame}} for shrinking.
# Add a shrinking-frame branch to the dispatcher in 
{{WindowEvaluatorFactoryBase}} that consults the same {{eligibleForSegTree}} 
gate and, on success, constructs a {{SegmentTreeWindowFunctionFrame}} with 
{{ubound = None}}.
# Use {{cacheHint = Some(2)}} explicitly for the shrinking call site rather 
than routing through {{estimateMaxCachedBlocks}} (which would silently default 
to {{Some(8)}} since no {{IntegerLiteral}} upper-bound case matches). The 
shrinking access pattern only ever needs at most 2 cached block-levels 
regardless of partition size: middle blocks of {{[lower, n)}} are answered 
directly from the always-resident {{blockAggregates}}, never via the per-block 
LRU; the lower-edge cursor advances monotonically with the output row, so each 
partial block is needed for at most {{blockSize}} consecutive queries and then 
never revisited.

All other infrastructure (eligibility, build, spill via {{TaskMemoryManager}}, 
the {{minPartitionRows}} fallback, the SQLMetrics for segtree-frames-built / 
fallbacks) is reused as-is.

h2. Behaviour

* Same opt-in conf: {{spark.sql.window.segmentTree.enabled=false}} (default 
off; users on shrinking frames stay on the legacy path).
* Same eligibility allowlist (DeclarativeAggregate with {{mergeExpressions}}, 
no FILTER, no DISTINCT).
* Same fallback for partitions below 
{{spark.sql.window.segmentTree.minPartitionRows}}, but to 
{{UnboundedFollowingWindowFunctionFrame}} instead of 
{{SlidingWindowFunctionFrame}}.
* No analyzer / SQL grammar / plan-shape changes.

h2. Benchmark

WindowBenchmark-style measurement on EC2 c5.4xlarge (Intel Xeon 8259CL @ 
2.50GHz, OpenJDK 17.0.19+10), single-partition shrinking-frame {{SUM(v) OVER 
(ORDER BY id ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING)}}:

|| N || naive (best time) || segtree (best time) || speedup ||
| 5K   | 620 ms    | 73 ms  | 8.5X    |
| 10K  | 2 471 ms  | 110 ms | 22.5X   |
| 25K  | 14 259 ms | 119 ms | 119.3X  |
| 50K  | 57 022 ms | 181 ms | 314.2X  |
| 100K | --        | 269 ms | --      |
| 200K | --        | 480 ms | --      |

Naive at N=100K and 200K skipped (extrapolated cost ~4 min / ~16 min per iter 
respectively); segtree path stays sub-second. The naive curve is clean O(N^2) 
(5x N -> 24x time at 50K vs 10K); segtree is sub-linear (2x N at 100K -> 200K 
-> 1.8x time, i.e. logarithmic per-row growth).

Per-aggregate at N=10K (other aggregates besides SUM):

|| Aggregate || naive || segtree || speedup ||
| SUM   | 2 471 ms | 110 ms | 22.5X |
| MIN   | 2 417 ms | 215 ms | 11.2X |
| MAX   | 2 396 ms | 228 ms | 10.5X |
| COUNT | 2 203 ms | 80 ms  | 27.4X |
| AVG   | 2 886 ms | 84 ms  | 34.5X |

h2. Workload relevance

Shrinking frames are common in retention / cohort / revenue analytics: 
"remaining lifetime value at this row", "future churn risk", "monthly revenue 
from here forward". For partitions of 100K rows or more (a single user's 
lifetime in a transactional table), the legacy O(N^2) path is infeasible.

h2. Test surface

{{UnboundedFollowingSegmentTreeSuite}} mirrors 
{{SegmentTreeWindowFunctionSuite}} structure: 26 oracle-vs-naive equivalence 
tests over CURRENT ROW / N PRECEDING / N FOLLOWING lower bounds, ROWS / RANGE 
frame types, single-row + empty partition + small-partition fallback, all-NULL 
/ mixed-NULL / NaN+Infinity, Int/Long/Double/Decimal/String/Date/Timestamp, 
multi-aggregate shared frame, collect_list fallback (non-DeclarativeAggregate), 
and DISTINCT analyzer rejection. All 26 pass.

Full regression coverage (no failures):
* {{SegmentTreeWindowFunctionSuite}} (41 sliding tests)
* {{WindowSegmentTreeSuite}}, {{WindowSegmentTreePropertySuite}}, 
{{WindowSegmentTreeMemorySuite}}, {{SegmentTreeWindowMetricsSuite}}, 
{{WindowSegmentTreeAllowlistSuite}} (50 segtree-adjacent tests)
* {{DataFrameWindowFunctionsSuite}} (55 high-level window tests)

172 tests total, 0 failures. Confirms the unified rewrite preserves 
sliding-frame semantics.

h2. Out of scope

* Public API / SQL surface changes.
* Distinct or filter-clause window aggregates (analyzer-rejected today).
* {{ImperativeAggregate}} or UDAF window functions (allowlist excludes them; 
fall back to legacy).
* Variant data type or other shrinking-frame-specific allowlist additions.

Follows up SPARK-56546.


> Extend block-chunked segment-tree window frame to shrinking frames (UNBOUNDED 
> FOLLOWING)
> ----------------------------------------------------------------------------------------
>
>                 Key: SPARK-57220
>                 URL: https://issues.apache.org/jira/browse/SPARK-57220
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 4.3.0
>            Reporter: Anupam Yadav
>            Priority: Major
>              Labels: pull-request-available
>
> h2. Background
> SPARK-56546 introduced {{SegmentTreeWindowFunctionFrame}} for non-invertible 
> *sliding* aggregates, replacing the O(N*W) full-recompute path with O(N log 
> W). The same data structure can answer arbitrary {{[lower, upper)}} queries, 
> including the case where the upper bound is the partition end.
> This JIRA extends the existing segment-tree implementation to the *shrinking* 
> frame shape, i.e. {{... ROWS/RANGE BETWEEN <lower> AND UNBOUNDED FOLLOWING}}.
> PR: https://github.com/apache/spark/pull/56291
> h2. Current behaviour
> For frames of the form {{... BETWEEN <lower> AND UNBOUNDED FOLLOWING}}, the 
> dispatcher in {{WindowEvaluatorFactoryBase.scala}} (lines 282-289 on master) 
> always selects {{UnboundedFollowingWindowFunctionFrame}}, which recomputes 
> the suffix aggregate from scratch for every output row. The class scaladoc 
> explicitly acknowledges the cost (WindowFunctionFrame.scala:636):
> {quote}
> This is a very expensive operator to use, O(n * (n - 1) / 2), because we need 
> to maintain a buffer and must do full recalculation after each row.
> {quote}
> The segtree path added by SPARK-56546 already builds the segment tree over 
> the full partition in {{prepare()}} and supports {{queryInto(lower, upper, 
> ...)}} for any subrange, but it is only wired into the moving-frame branch of 
> the dispatcher (lines 291-331). Shrinking frames bypass it entirely, even 
> though the upper bound is the trivial constant {{tree.size}}.
> h2. Proposal
> Extend {{SegmentTreeWindowFunctionFrame}} to also handle the shrinking case. 
> Three changes:
> # Make {{ubound}} an {{Option[BoundOrdering]}} on the constructor. {{None}} 
> means shrinking (upper is the partition end); {{Some(ub)}} preserves the 
> existing sliding behaviour. Add a {{fallbackFactory: () => 
> WindowFunctionFrame}} so the small-partition path can produce 
> {{SlidingWindowFunctionFrame}} for sliding and 
> {{UnboundedFollowingWindowFunctionFrame}} for shrinking.
> # Add a shrinking-frame branch to the dispatcher in 
> {{WindowEvaluatorFactoryBase}} that consults the same {{eligibleForSegTree}} 
> gate and, on success, constructs a {{SegmentTreeWindowFunctionFrame}} with 
> {{ubound = None}}.
> # Use {{cacheHint = Some(2)}} explicitly for the shrinking call site rather 
> than routing through {{estimateMaxCachedBlocks}} (which would silently 
> default to {{Some(8)}} since no {{IntegerLiteral}} upper-bound case matches). 
> The shrinking access pattern only ever needs at most 2 cached block-levels 
> regardless of partition size: middle blocks of {{[lower, n)}} are answered 
> directly from the always-resident {{blockAggregates}}, never via the 
> per-block LRU; the lower-edge cursor advances monotonically with the output 
> row, so each partial block is needed for at most {{blockSize}} consecutive 
> queries and then never revisited.
> All other infrastructure (eligibility, build, spill via 
> {{TaskMemoryManager}}, the {{minPartitionRows}} fallback, the SQLMetrics for 
> segtree-frames-built / fallbacks) is reused as-is.
> h2. Behaviour
> * Same opt-in conf: {{spark.sql.window.segmentTree.enabled=false}} (default 
> off; users on shrinking frames stay on the legacy path).
> * Same eligibility allowlist (DeclarativeAggregate with {{mergeExpressions}}, 
> no FILTER, no DISTINCT).
> * Same fallback for partitions below 
> {{spark.sql.window.segmentTree.minPartitionRows}}, but to 
> {{UnboundedFollowingWindowFunctionFrame}} instead of 
> {{SlidingWindowFunctionFrame}}.
> * No analyzer / SQL grammar / plan-shape changes.
> h2. Benchmark
> WindowBenchmark-style measurement on Linux x86_64 (Intel Xeon Platinum 8259CL 
> @ 2.50GHz, OpenJDK 17.0.19+10-LTS), single-partition shrinking-frame {{SUM(v) 
> OVER (ORDER BY id ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING)}}:
> || N || naive (best time) || segtree (best time) || speedup ||
> | 5K   | 620 ms    | 73 ms  | 8.5X    |
> | 10K  | 2 471 ms  | 110 ms | 22.5X   |
> | 25K  | 14 259 ms | 119 ms | 119.3X  |
> | 50K  | 57 022 ms | 181 ms | 314.2X  |
> | 100K | --        | 269 ms | --      |
> | 200K | --        | 480 ms | --      |
> Naive at N=100K and 200K skipped (extrapolated cost ~4 min / ~16 min per iter 
> respectively); segtree path stays sub-second. The naive curve is clean O(N^2) 
> (5x N -> 24x time at 50K vs 10K); segtree is sub-linear (2x N at 100K -> 200K 
> -> 1.8x time, i.e. logarithmic per-row growth).
> Per-aggregate at N=10K (other aggregates besides SUM):
> || Aggregate || naive || segtree || speedup ||
> | SUM   | 2 471 ms | 110 ms | 22.5X |
> | MIN   | 2 417 ms | 215 ms | 11.2X |
> | MAX   | 2 396 ms | 228 ms | 10.5X |
> | COUNT | 2 203 ms | 80 ms  | 27.4X |
> | AVG   | 2 886 ms | 84 ms  | 34.5X |
> h2. Workload relevance
> Shrinking frames are common in retention / cohort / revenue analytics: 
> "remaining lifetime value at this row", "future churn risk", "monthly revenue 
> from here forward". For partitions of 100K rows or more (a single user's 
> lifetime in a transactional table), the legacy O(N^2) path is infeasible.
> h2. Test surface
> {{UnboundedFollowingSegmentTreeSuite}} mirrors 
> {{SegmentTreeWindowFunctionSuite}} structure: 26 oracle-vs-naive equivalence 
> tests over CURRENT ROW / N PRECEDING / N FOLLOWING lower bounds, ROWS / RANGE 
> frame types, single-row + empty partition + small-partition fallback, 
> all-NULL / mixed-NULL / NaN+Infinity, 
> Int/Long/Double/Decimal/String/Date/Timestamp, multi-aggregate shared frame, 
> collect_list fallback (non-DeclarativeAggregate), and DISTINCT analyzer 
> rejection. All 26 pass.
> Full regression coverage (no failures):
> * {{SegmentTreeWindowFunctionSuite}} (41 sliding tests)
> * {{WindowSegmentTreeSuite}}, {{WindowSegmentTreePropertySuite}}, 
> {{WindowSegmentTreeMemorySuite}}, {{SegmentTreeWindowMetricsSuite}}, 
> {{WindowSegmentTreeAllowlistSuite}} (50 segtree-adjacent tests)
> * {{DataFrameWindowFunctionsSuite}} (55 high-level window tests)
> 172 tests total, 0 failures. Confirms the unified rewrite preserves 
> sliding-frame semantics.
> h2. Out of scope
> * Public API / SQL surface changes.
> * Distinct or filter-clause window aggregates (analyzer-rejected today).
> * {{ImperativeAggregate}} or UDAF window functions (allowlist excludes them; 
> fall back to legacy).
> * Variant data type or other shrinking-frame-specific allowlist additions.
> Follows up SPARK-56546.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to