yaooqinn commented on code in PR #55422:
URL: https://github.com/apache/spark/pull/55422#discussion_r3124361905


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/window/SegmentTreeWindowFunctionFrame.scala:
##########
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.window
+
+import org.apache.spark.TaskContext
+import org.apache.spark.memory.TaskMemoryManager
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, 
FrameType, MutableProjection, RangeFrame, RowFrame, UnsafeRow}
+import org.apache.spark.sql.catalyst.expressions.aggregate.DeclarativeAggregate
+import org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray
+import org.apache.spark.sql.execution.metric.SQLMetric
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * Moving-frame window function frame backed by [[WindowSegmentTree]]. Produces
+ * the same outputs as [[SlidingWindowFunctionFrame]] for RowFrame or
+ * single-column RangeFrame moving frames whose aggregate functions are all
+ * [[DeclarativeAggregate]] with no FILTER/DISTINCT. For partitions below
+ * `spark.sql.window.segmentTree.minPartitionRows`, delegates to a wrapped
+ * [[SlidingWindowFunctionFrame]].
+ *
+ * See `docs/frame-integration-contract.md` Section 1 for the full contract.
+ *
+ * RANGE support (RANGE frame support): when `frameType == RangeFrame`, the 
per-row
+ * driver uses two forward-only cursors (`lowerIter` / `upperIter`) over the
+ * materialized `rowArray` to advance `lowerBound` / `upperBound`. Endpoints
+ * are monotone per partition (SQL RANGE semantics + sorted input), so total
+ * cursor work is O(n). The segment tree then answers `[lowerBound,
+ * upperBound)` in O(log n).
+ *
+ * @note Not thread-safe.
+ */
+private[window] final class SegmentTreeWindowFunctionFrame(
+    target: InternalRow,
+    processor: AggregateProcessor,
+    functions: Array[DeclarativeAggregate],
+    inputSchema: Seq[Attribute],
+    frameType: FrameType,
+    lbound: BoundOrdering,
+    ubound: BoundOrdering,
+    newMutableProjection: (Seq[Expression], Seq[Attribute]) => 
MutableProjection,
+    conf: SQLConf,
+    maxCachedBlocks: Option[Int],
+    taskMemoryManager: TaskMemoryManager,
+    numSegmentTreeFrames: Option[SQLMetric] = None,
+    numSegmentTreeFallbackFrames: Option[SQLMetric] = None)
+  extends WindowFunctionFrame with AutoCloseable {
+
+  require(frameType == RowFrame || frameType == RangeFrame,
+    s"SegmentTreeWindowFunctionFrame supports RowFrame or RangeFrame, got 
$frameType")
+
+  private[this] val fallback =
+    new SlidingWindowFunctionFrame(target, processor, lbound, ubound)
+  private[this] var tree: WindowSegmentTree = _
+
+  // ---- RowFrame-only driver state ----
+  // `boundIter` advances `upperBound` one row at a time (sliding admit loop);
+  // the lower bound is inferred from pure index arithmetic under RowFrame.
+  private[this] var boundIter: Iterator[UnsafeRow] = _
+  private[this] var nextRow: UnsafeRow = _
+
+  // ---- RangeFrame-only driver state ----
+  // Two cursors over `rowArray`; `lowerRow` / `upperRow` hold the currently-
+  // buffered head of each cursor (pre-fetched in `prepare` -- fix so
+  // `RangeBoundOrdering.compare` is never called with a null row on round 0).
+  //
+  // Spill-safety invariant: when `rowArray` spills, its iterator reuses a
+  // single `UnsafeRow` object whose pointer is rebound on each `next()`.
+  // That is tolerated here because the cursor is **read-before-advance**:
+  // each `writeRange` loop iteration reads `lowerRow`/`upperRow` for
+  // comparison before calling `getNextOrNull(...)`. Between `write()`
+  // calls the pointer's content is stable (this frame is the sole consumer
+  // of each iterator). DO NOT cache a historical row into a separate field
+  // without an explicit `.copy()`; the shared reusable UnsafeRow would
+  // silently mutate.
+  private[this] var lowerIter: Iterator[UnsafeRow] = _
+  private[this] var upperIter: Iterator[UnsafeRow] = _
+  private[this] var lowerRow: UnsafeRow = _
+  private[this] var upperRow: UnsafeRow = _
+
+  // Shared endpoints: monotone across `write()` calls within a partition.
+  private[this] var lowerBound: Int = 0
+  private[this] var upperBound: Int = 0
+
+  /**
+   * Runtime dispatch flag: when `true`, `write()`, `currentLowerBound()`,
+   * and `currentUpperBound()` delegate to the wrapped
+   * [[SlidingWindowFunctionFrame]] (small-partition path). Set by
+   * `prepare()` based on partition size vs.
+   * `spark.sql.window.segmentTree.minPartitionRows`.
+   */
+  private[window] var fallbackUsed: Boolean = false
+
+  // Register close() once per frame instance so the tree's block cache and
+  // any open row-array iterators are released when the task completes.
+  // Keeping the registration here (vs. inside the factory closure) avoids
+  // duplicate listeners when the factory is invoked multiple times per task.
+  {
+    val tc = TaskContext.get()
+    if (tc != null) {
+      tc.addTaskCompletionListener[Unit](_ => close())
+    }
+  }
+
+  override def prepare(rows: ExternalAppendOnlyUnsafeRowArray): Unit = {
+    // First-call hygiene: release prior tree/iterators/cursors before building
+    // the new tree. `prepare` is called exactly once per (partition, frame) by
+    // `WindowPartitionEvaluator.fetchNextPartition` (the sole call site); this
+    // mirrors `SlidingWindowFunctionFrame`, which has no guard. Counters bump
+    // unconditionally on the successful branch.
+    if (tree != null) {
+      tree.close()
+      tree = null
+    }
+    closeIters()
+    nextRow = null
+    lowerRow = null
+    upperRow = null
+    lowerBound = 0
+    upperBound = 0
+    if (rows.length < conf.windowSegmentTreeMinPartitionRows) {
+      fallbackUsed = true
+      fallback.prepare(rows)
+      // Count only on the successful fallback path: if `fallback.prepare`
+      // throws, the counter is not bumped.
+      numSegmentTreeFallbackFrames.foreach(_ += 1)
+      return
+    }
+    fallbackUsed = false
+    tree = new WindowSegmentTree(
+      functions,
+      inputSchema,
+      newMutableProjection,
+      fanout = conf.windowSegmentTreeFanout,
+      blockSize = conf.windowSegmentTreeBlockSize,
+      maxCachedBlocks = maxCachedBlocks,
+      taskMemoryManager = taskMemoryManager)
+    // Build first (drains rows into the tree's internal row array), then
+    // open fresh iterator(s) for per-row bound advancement.
+    tree.build(rows.generateIterator())
+    // Count only on the successful segtree path: if `tree.build` throws
+    // (e.g. OOM during block allocation), the counter is not bumped.
+    numSegmentTreeFrames.foreach(_ += 1)
+    frameType match {
+      case RowFrame =>
+        boundIter = rows.generateIterator()
+        nextRow = WindowFunctionFrame.getNextOrNull(boundIter)
+      case RangeFrame =>
+        lowerIter = rows.generateIterator()
+        upperIter = rows.generateIterator()
+        // Pre-seed cursor heads so `RangeBoundOrdering.compare` (which
+        // projects the input row to a scalar) never dereferences null on
+        // round 0. Either may legitimately be null if `rows` is
+        // empty; the advance loops' `!= null` / `< upperBound` guards
+        // handle that.
+        lowerRow = WindowFunctionFrame.getNextOrNull(lowerIter)
+        upperRow = WindowFunctionFrame.getNextOrNull(upperIter)
+    }
+  }
+
+  override def write(index: Int, current: InternalRow): Unit = {
+    if (fallbackUsed) {
+      fallback.write(index, current)
+      return
+    }
+    frameType match {
+      case RowFrame => writeRow(index, current)
+      case RangeFrame => writeRange(index, current)
+    }
+  }
+
+  private def writeRow(index: Int, current: InternalRow): Unit = {

Review Comment:
   The duplication is intentional and documented in `f8109f1e77b` — added a 
comment on `writeRow`/`writeRange` noting they mirror 
`SlidingWindowFunctionFrame.write`'s `(lowerBound, upperBound)` monotone-cursor 
invariant, with admit-then-drop (no buffer to maintain) instead of 
drop-then-admit.
   
   Equivalence is guarded by `SegmentTreeWindowFunctionSuite`'s flag-on/off 
tests (`checkRangeEquivalence`, `feature flag off ...`, fallback suite), which 
compare segtree output against the Sliding baseline for identical queries — any 
future drift in Sliding's boundary semantics will fail these.
   
   I considered extracting a shared `MonotoneAdmitDropCursor`, but the two 
frames diverge on state (Sliding maintains an `ArrayDeque`, segtree keeps only 
cursors) and on admit-branch semantics (Sliding fuses admit+drop via `if 
(lbound.compare…) lowerBound += 1 else buffer.add`, segtree splits them). A 
shared helper would need ≥2 callback hooks and would touch the Sliding hot 
path, which seems disproportionate for this PR. Happy to revisit if you'd 
prefer.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to