yaooqinn commented on code in PR #55422: URL: https://github.com/apache/spark/pull/55422#discussion_r3124196492
########## sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowSegmentTree.scala: ########## @@ -0,0 +1,610 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.window + +import java.util.{LinkedHashMap => JLinkedHashMap, Map => JMap} + +import scala.collection.mutable + +import org.apache.spark.{SparkEnv, SparkException, TaskContext} +import org.apache.spark.memory.{MemoryConsumer, TaskMemoryManager} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.aggregate.DeclarativeAggregate +import org.apache.spark.sql.errors.QueryExecutionErrors +import org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray +import org.apache.spark.sql.types.DataType +import org.apache.spark.util.ArrayImplicits._ + +/** + * Block-chunked segment tree for range aggregate queries over window partitions. + * + * See `the class documentation` + * for the full design (API contract Section 2, block-chunked memory layout Section 3, + * DeclarativeAggregate binding Section 4, error handling Section 5, test hooks Section 6). + * + * initial implementation scope: correctness only. The data layer uses + * `ExternalAppendOnlyUnsafeRowArray` to hold input rows (spillable). Each + * block materializes its own small segment tree (levels 0..h). Internal + * nodes are cached in an LRU keyed by block index; block root aggregates + * (block pre-aggregates) stay resident for all blocks. + * + * Note: the design doc Section 3.3 specifies leaves are NOT materialized and + * recomputed from the spillable array on demand. For initial implementation simplicity + * we materialize leaves inside the per-block internal node arrays. + * // TODO(SPARK-XXXXX) re-assess after Frame integration. + * + * @note Instances are not thread-safe. + */ +private[window] class WindowSegmentTree( + functions: Array[DeclarativeAggregate], + inputSchema: Seq[Attribute], + newMutableProjection: (Seq[Expression], Seq[Attribute]) => MutableProjection, + fanout: Int = WindowSegmentTree.DefaultFanout, + blockSize: Int = WindowSegmentTree.DefaultBlockSize, + maxCachedBlocks: Option[Int] = None, + spillThreshold: Int = Int.MaxValue, + inMemoryThreshold: Int = Int.MaxValue, + taskMemoryManager: TaskMemoryManager = null) + extends AutoCloseable { + + require(fanout >= 2, s"fanout must be >= 2, got $fanout") + require(blockSize >= 1, s"blockSize must be >= 1, got $blockSize") + require(functions.nonEmpty, "WindowSegmentTree requires at least one aggregate function") + maxCachedBlocks.foreach { n => + require(n >= 1, s"maxCachedBlocks must be >= 1 when specified, got $n") + } + require(taskMemoryManager != null, + "WindowSegmentTree requires a non-null TaskMemoryManager; " + + "in tests use `new TaskMemoryManager(new TestMemoryManager(conf), 0)`") + + // ---------- Schemas & projections ---------- + + private val bufferAttrs: Seq[AttributeReference] = + functions.flatMap(_.aggBufferAttributes).toImmutableArraySeq + private val rightAttrs: Seq[AttributeReference] = + functions.flatMap(_.inputAggBufferAttributes).toImmutableArraySeq + private val bufferDataTypes: IndexedSeq[DataType] = + bufferAttrs.map(_.dataType).toIndexedSeq + + private val initialValues: Seq[Expression] = functions.flatMap(_.initialValues).toIndexedSeq + private val updateExpressions: Seq[Expression] = + functions.flatMap(_.updateExpressions).toIndexedSeq + private val mergeExpressions: Seq[Expression] = + functions.flatMap(_.mergeExpressions).toIndexedSeq + + private[this] val initProj: MutableProjection = newMutableProjection(initialValues, Nil) + private[this] val updateProj: MutableProjection = + newMutableProjection(updateExpressions, bufferAttrs ++ inputSchema) + private[this] val mergeProj: MutableProjection = + newMutableProjection(mergeExpressions, bufferAttrs ++ rightAttrs) + + private val inputUnsafeProj: UnsafeProjection = + UnsafeProjection.create(inputSchema.map(_.dataType).toArray) + + private[this] val joinedRow = new JoinedRow() + + // ---------- State ---------- + + private var numRows: Int = 0 + private var numBlocks: Int = 0 + private var rowArray: ExternalAppendOnlyUnsafeRowArray = _ + private var closed: Boolean = false + + /** Always-resident per-block root aggregates. `blockAggregates(i)` = + * merged buffer over all rows in block i. */ + private var blockAggregates: Array[InternalRow] = Array.empty + + /** Rough byte width of one aggregate buffer row. Chosen at 16 B/field as a + * conservative heap-overhead-aware lower bound for a + * `SpecificInternalRow` slot: primitive `MutableValue` is 8 B, boxed + * references and object headers push the effective footprint higher. + * Tighter per-type sizing (real boxing cost, variable-length fields) is + * intentionally out of scope here; TaskMemoryManager remains the hard + * backstop via spill / OOM. + * TODO(SPARK-XXXXX): per-type width estimator keyed on + * `bufferDataTypes` (primitive 16 B, String/Binary/Decimal wider). */ + private val bufferWidthBytes: Long = { + val bytesPerField = 16L + math.max(1L, bufferDataTypes.size.toLong * bytesPerField) + } + + /** Number of aggregate-buffer slots cached per block (contract I5). + * + * Invariant: equals `sum over levels L of levels(L).length` for any + * block materialized by [[buildBlockLevels]]. Level 0 holds `blockSize` + * leaf buffers and each subsequent level holds `ceil(prev / fanout)` + * parent buffers until a single root remains. The iterative ceiling + * matches the allocation in [[buildBlockLevels]] for every + * `(blockSize, fanout)` pair, including non-power-of-`fanout` cases. + * For `blockSize == 1` the block is a single leaf with no parent + * levels, so this returns 1. + * TODO(SPARK-XXXXX): drop the leaf term when [[buildBlockLevels]] + * switches to on-demand leaf recomputation. */ + private val cachedSlotsPerBlock: Long = { + var n = blockSize.toLong + var sum = n + while (n > 1L) { + n = (n + fanout - 1) / fanout + sum += n + } + sum + } + + /** Bytes accounted per cached block (contract I5). Conservative: assumes + * every block is full; tail block (`numRows % blockSize != 0`) will + * hold fewer leaves, giving a small headroom. */ + private[this] val blockBytes: Long = + math.max(1L, cachedSlotsPerBlock * bufferWidthBytes) + + /** `spans(L)` = number of leaves covered by a single node at level L. Depends + * only on fanout + blockSize, so precomputed once. */ + private val spans: Array[Int] = { + val maxLevel = { + var lvl = 0 + var span = 1L + while (span < blockSize) { span *= fanout; lvl += 1 } + lvl + } + val arr = new Array[Int](maxLevel + 1) + var s = 1L + var i = 0 + while (i <= maxLevel) { + arr(i) = if (s > Int.MaxValue) Int.MaxValue else s.toInt + s *= fanout + i += 1 + } + arr + } + + /** LRU cache of per-block internal node arrays. Key = blockIdx. + * Value = `Array[Array[InternalRow]]` with levels(0..h). Auto-eviction via + * `removeEldestEntry` is disabled (contract I3) -- eviction is driven + * explicitly from [[ensureBlockLevels]] (capacity overflow) or + * [[SegTreeSpiller.spill]] (TMM pressure). Each cache entry maps 1:1 to + * one [[acquireBlockMemory]] accounting. Callers (e.g. + * `SegmentTreeWindowFunctionFrame`) should pass a W-aware + * value like `ceil(W / blockSize) + 2`. */ + private val blockLevelsCache: JLinkedHashMap[Integer, Array[Array[InternalRow]]] = + new JLinkedHashMap[Integer, Array[Array[InternalRow]]](16, 0.75f, true) { + override def removeEldestEntry( + eldest: JMap.Entry[Integer, Array[Array[InternalRow]]]): Boolean = false + } + + // ---------- Memory consumer (contract Section 2.2) ---------- + + /** + * Private MemoryConsumer tracking cached block levels under TMM. + * + * Heap accounting only (no Tungsten pages): uses + * [[MemoryConsumer.acquireMemory]] / [[MemoryConsumer.freeMemory]]. The + * [[MemoryConsumer]] base class records `used` via an `AtomicLong` when + * we call these -- so TMM's consumer-priority sort in + * `acquireExecutionMemory` sees our pressure accurately. + * + * @note `spill()` MUST NOT call `acquireMemory` (contract I1). + */ + private final class SegTreeSpiller extends MemoryConsumer( + taskMemoryManager, + taskMemoryManager.pageSizeBytes(), + taskMemoryManager.getTungstenMemoryMode()) { Review Comment: Addressed in `0dd8b9b20e0` — hardcoded `SegTreeSpiller` to `MemoryMode.ON_HEAP` (with scaladoc explaining why: cached state — `SpecificInternalRow` buffers, `Array[Array[InternalRow]]`, `ExternalAppendOnlyUnsafeRowArray` pages — is unambiguously on-heap regardless of Tungsten mode). T9 test flipped: `T9 OFF_HEAP Tungsten: spiller mode stays ON_HEAP (not OFF_HEAP)` now asserts the on-heap charge under `spark.memory.offHeap.enabled=true`. Benchmark refreshed in `0cdaffdbf50`. Happy to revisit the cross-pool spill-ordering concern as a follow-up if it ever shows up in practice. Thanks for catching this! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
