avantgardnerio opened a new issue, #23089:
URL: https://github.com/apache/datafusion/issues/23089

   ## What is the problem the feature is trying to solve?
   
   Several downstream consumers want to know, at runtime, the lex-min / lex-max 
of a partition's output along its declared output ordering. Today there is no 
way to ask an `ExecutionPlan` for this. The need shows up in at least three 
places:
   
   1. **Parallelizing single-partition window functions (RANGE frames, no 
PARTITION BY).** To fan a single sorted stream into N parallel output 
partitions without a partition key, a planner needs the global value range so 
it can compute interior split points. Statistics-based estimates from the 
catalog are too loose; we need what the sort actually produced. This is the use 
case that motivated the work — see the existing draft PR 
[#23026](https://github.com/apache/datafusion/pull/23026) for the full picture.
   2. **Range-elimination rules.** A future optimizer could prune downstream 
filters or skip irrelevant data when it can prove a partition's range doesn't 
intersect a predicate's range. Needs observed extrema from the upstream.
   3. **Dynamic-range repartitioning, complementing `Partitioning::Range`.** 
[#22207](https://github.com/apache/datafusion/pull/22207) landed 
`Partitioning::Range { ordering, split_points }` for the *declarative* case — 
split points known at plan time, e.g. from a `TableProvider` declaring its 
output partitioning. A natural follow-up is a *dynamic* variant where split 
points are discovered at execute time from an upstream's actual data range. 
Runtime partition extrema are the missing primitive: the dynamic variant routes 
rows by a runtime-computed boundary set, and downstream operators learn the 
per-bucket boundaries through the same trait method.
   
   Today the only way to get this is to drain the operator into a separate 
aggregate pass — which defeats the point.
   
   ## Describe the solution you'd like
   
   A small addition to `ExecutionPlan`:
   
   ```rust
   pub enum ExtremaKind {
       /// `min` / `max` literally bound the partition's data.
       /// `SortExec` is the canonical implementer.
       Observed,
       /// `min` / `max` describe the partition's primary range.
       /// The partition deliberately carries rows outside that range
       /// (a "halo"); a downstream operator is contracted to filter
       /// back to the range. No operator in this branch returns
       /// `Expanded`; a future dynamic-range repartition (e.g. a
       /// `Partitioning::DynamicRange` routed through `RepartitionExec`)
       /// would.
       Expanded,
   }
   
   pub struct PartitionExtrema {
       pub kind: ExtremaKind,
       pub min: Vec<ScalarValue>,
       pub max: Vec<ScalarValue>,
       pub row_count: usize,
   }
   
   // New trait method on ExecutionPlan, default Ok(None):
   fn runtime_partition_extrema(
       &self,
       partition: usize,
   ) -> Result<Option<PartitionExtrema>>;
   ```
   
   Initial overrides:
   
   - `SortExec` populates a per-partition slot inside the sort code path; the 
override returns the slot's contents once execution has folded any sorted chunk 
into it. Reading before the upstream has been driven enough returns `Ok(None)` 
rather than panicking.
   - `BoundedWindowAggExec` passthroughs to its input (BWAG extends input 
equivalence properties, preserving the leading sort exprs along the same column 
indices in its output).
   
   Implementation lives at `coralogix/arrow-datafusion@brent/partition-extrema` 
([compare 
view](https://github.com/coralogix/arrow-datafusion/compare/apache:main...coralogix:arrow-datafusion:brent/partition-extrema)).
 Three commits, +500 / -10 LoC, branched off `apache/main`. 7 unit tests cover 
observer behavior + caller contract; `cargo clippy --all-features --all-targets 
-- -D warnings` and `cargo fmt` clean.
   
   ## Design points worth debating
   
   1. **One trait method, two semantics, encoded by `ExtremaKind`.** The 
downstream consumer of "Expanded" extrema and the upstream consumer of 
"Observed" extrema want exactly the same shape — `Vec<ScalarValue>` per 
endpoint, matched against the output ordering, populated once the partition is 
ready to be read. Splitting into two trait methods would force every 
passthrough operator to implement both with identical bodies. The contract is 
documented at each implementer's call site. Alternative: a pure enum with 
arm-specific data — rejected because passthroughs that don't care about the 
variant would still need to `match`. See the type-level rustdoc in the branch 
for the full rationale.
   2. **Caller contract: silent `Ok(None)` if read too early, not a panic.** 
Reading before the operator's slot is populated returns `Ok(None)`. The 
progress contract is that callers drive `execute(partition)` past the point 
where the implementing operator can know. Tradeoff: easier to misuse silently 
vs. panicking. The branch's rustdoc spells this out as a doc-invariant rather 
than a runtime gate.
   3. **Passthrough policy.** Operators whose output ordering matches the 
input's leading-key ordering MAY forward via 
`self.input.runtime_partition_extrema(...)`, preserving `kind` unchanged. The 
trait does not enforce this — it's per-operator. `BoundedWindowAggExec` is 
wired in this branch; `ProjectionExec` (conditional on the leading sort col 
surviving) and `SortPreservingMergeExec` (an N→1 reducer, not a passthrough) 
are follow-ups.
   
   ## Describe alternatives you've considered
   
   - **Statistics-shaped runtime stats.** First version of the spike used 
`Statistics`/`ColumnStatistics`. Doesn't fit: sort keys are arbitrary 
expressions, not just columns, and the lex-extreme row's trailing-key values 
aren't natural extrema of those keys.
   - **Async signaling on the trait method.** Overengineered for our case — 
every implementer that has an answer has it at a known point in execution 
(post-buffer for `SortExec`). The caller contract handles "not yet" via 
`Ok(None)`.
   - **Side-channel between operators** (instead of a uniform trait). Threading 
boundary state through specific operator pairs is fragile and doesn't 
generalize to other range-aware optimizations (point 2 above).
   
   ## Coexistence with existing work
   
   `Partitioning::Range` / `RangePartitioning { ordering, split_points }` from 
[#22207](https://github.com/apache/datafusion/pull/22207), 
[#22607](https://github.com/apache/datafusion/pull/22607), 
[#22777](https://github.com/apache/datafusion/pull/22777) (@gene-bordegaray, 
Datadog) covers the declarative case: split points known at plan time. 
`PartitionExtrema` doesn't change that path. What it enables is a future 
*dynamic* sibling — where the boundary set is discovered at runtime — without 
inventing a parallel runtime-stats facility. @stuhood's [comment on 
#22395](https://github.com/apache/datafusion/issues/22395#issuecomment-4618822694)
 about overlapping output partitioning is the design point a dynamic-range 
variant could explore.
   
   ## Additional context
   
   This issue is the discussion home for the API. If reviewers agree on the 
shape, the branch can be cleaned into a single PR (pure addition, zero behavior 
change, default `Ok(None)` so nothing currently in tree calls or implements the 
method). Window-function parallelization and a dynamic-range repartitioning 
variant land as separate follow-ups on top.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to