This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 3087ca8a92 perf: optimize `NthValue` when `ignore_nulls` is true 
(#19496)
3087ca8a92 is described below

commit 3087ca8a928f9593f05603de2ac615a57e6ab5e2
Author: Mikhail Zabaluev <[email protected]>
AuthorDate: Sat Jan 10 04:19:53 2026 +0200

    perf: optimize `NthValue` when `ignore_nulls` is true (#19496)
    
    ## Rationale for this change
    
    The `PartitionEvaluator` implementation for `NthValue` in DataFusion has
    a few shortcomings:
    
    * When nulls are ignored (meaning the count should skip over them), the
    evaluation collects an array of all valid indices, to select at most one
    index accordingly to the `First`/`Last`/`Nth` case.
    * The `memoize` implementation gives up in the same condition, even
    after performing part of the logic!
    
    ## What changes are included in this PR?
    
    Use only as much iteration over the valid indices as needed for the
    function case, without collecting all indices.
    The `memoize` implementation does the right thing for `FirstValue` with
    `ignore_nulls` set to true, or returns early for other function cases.
    
    ## Are these changes tested?
    
    All existing tests pass for `FirstValue`/`LastValue`/`NthValue`.
    
    ## Are there any user-facing changes?
    
    No.
---
 Cargo.lock                                       |   1 +
 datafusion/functions-window/Cargo.toml           |   8 +
 datafusion/functions-window/benches/nth_value.rs | 263 +++++++++++++++++++++++
 datafusion/functions-window/src/nth_value.rs     | 182 +++++++++-------
 4 files changed, 372 insertions(+), 82 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index ad347e1072..4f105dc1b4 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -2310,6 +2310,7 @@ name = "datafusion-functions-window"
 version = "51.0.0"
 dependencies = [
  "arrow",
+ "criterion",
  "datafusion-common",
  "datafusion-doc",
  "datafusion-expr",
diff --git a/datafusion/functions-window/Cargo.toml 
b/datafusion/functions-window/Cargo.toml
index 42690907ae..fae71e180e 100644
--- a/datafusion/functions-window/Cargo.toml
+++ b/datafusion/functions-window/Cargo.toml
@@ -51,3 +51,11 @@ datafusion-physical-expr = { workspace = true }
 datafusion-physical-expr-common = { workspace = true }
 log = { workspace = true }
 paste = { workspace = true }
+
+[dev-dependencies]
+arrow = { workspace = true, features = ["test_utils"] }
+criterion = { workspace = true }
+
+[[bench]]
+name = "nth_value"
+harness = false
diff --git a/datafusion/functions-window/benches/nth_value.rs 
b/datafusion/functions-window/benches/nth_value.rs
new file mode 100644
index 0000000000..00daf9fa4f
--- /dev/null
+++ b/datafusion/functions-window/benches/nth_value.rs
@@ -0,0 +1,263 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::hint::black_box;
+use std::ops::Range;
+use std::slice;
+use std::sync::Arc;
+
+use arrow::array::ArrayRef;
+use arrow::datatypes::{DataType, Field, FieldRef, Int64Type};
+use arrow::util::bench_util::create_primitive_array;
+
+use criterion::{BenchmarkId, Criterion, criterion_group, criterion_main};
+use datafusion_common::ScalarValue;
+use datafusion_expr::{PartitionEvaluator, WindowUDFImpl};
+use datafusion_functions_window::nth_value::{NthValue, NthValueKind};
+use datafusion_functions_window_common::partition::PartitionEvaluatorArgs;
+use datafusion_physical_expr::expressions::{Column, Literal};
+use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
+
+const ARRAY_SIZE: usize = 8192;
+
+/// Creates a partition evaluator for FIRST_VALUE, LAST_VALUE, or NTH_VALUE
+fn create_evaluator(
+    kind: NthValueKind,
+    ignore_nulls: bool,
+    n: Option<i64>,
+) -> Box<dyn PartitionEvaluator> {
+    let expr = Arc::new(Column::new("c", 0)) as Arc<dyn PhysicalExpr>;
+    let input_field: FieldRef = Field::new("c", DataType::Int64, true).into();
+    let input_fields = vec![input_field];
+
+    let (nth_value, exprs): (NthValue, Vec<Arc<dyn PhysicalExpr>>) = match 
kind {
+        NthValueKind::First => (NthValue::first(), vec![expr]),
+        NthValueKind::Last => (NthValue::last(), vec![expr]),
+        NthValueKind::Nth => {
+            let n_value =
+                Arc::new(Literal::new(ScalarValue::Int64(n))) as Arc<dyn 
PhysicalExpr>;
+            (NthValue::nth(), vec![expr, n_value])
+        }
+    };
+
+    let args = PartitionEvaluatorArgs::new(&exprs, &input_fields, false, 
ignore_nulls);
+    nth_value.partition_evaluator(args).unwrap()
+}
+
+fn bench_nth_value_ignore_nulls(c: &mut Criterion) {
+    let mut group = c.benchmark_group("nth_value_ignore_nulls");
+
+    // Test different null densities
+    let null_densities = [0.0, 0.3, 0.5, 0.8];
+
+    for null_density in null_densities {
+        let values = Arc::new(create_primitive_array::<Int64Type>(
+            ARRAY_SIZE,
+            null_density,
+        )) as ArrayRef;
+        let null_pct = (null_density * 100.0) as u32;
+
+        // FIRST_VALUE with ignore_nulls - expanding window
+        group.bench_function(
+            BenchmarkId::new("first_value_expanding", 
format!("{null_pct}%_nulls")),
+            |b| {
+                b.iter(|| {
+                    let mut evaluator = create_evaluator(NthValueKind::First, 
true, None);
+                    let values_slice = slice::from_ref(&values);
+                    for i in 0..values.len() {
+                        let range = Range {
+                            start: 0,
+                            end: i + 1,
+                        };
+                        black_box(evaluator.evaluate(values_slice, 
&range).unwrap());
+                    }
+                })
+            },
+        );
+
+        // LAST_VALUE with ignore_nulls - expanding window
+        group.bench_function(
+            BenchmarkId::new("last_value_expanding", 
format!("{null_pct}%_nulls")),
+            |b| {
+                b.iter(|| {
+                    let mut evaluator = create_evaluator(NthValueKind::Last, 
true, None);
+                    let values_slice = slice::from_ref(&values);
+                    for i in 0..values.len() {
+                        let range = Range {
+                            start: 0,
+                            end: i + 1,
+                        };
+                        black_box(evaluator.evaluate(values_slice, 
&range).unwrap());
+                    }
+                })
+            },
+        );
+
+        // NTH_VALUE(col, 10) with ignore_nulls - get 10th non-null value
+        group.bench_function(
+            BenchmarkId::new("nth_value_10_expanding", 
format!("{null_pct}%_nulls")),
+            |b| {
+                b.iter(|| {
+                    let mut evaluator =
+                        create_evaluator(NthValueKind::Nth, true, Some(10));
+                    let values_slice = slice::from_ref(&values);
+                    for i in 0..values.len() {
+                        let range = Range {
+                            start: 0,
+                            end: i + 1,
+                        };
+                        black_box(evaluator.evaluate(values_slice, 
&range).unwrap());
+                    }
+                })
+            },
+        );
+
+        // NTH_VALUE(col, -10) with ignore_nulls - get 10th from last non-null 
value
+        group.bench_function(
+            BenchmarkId::new("nth_value_neg10_expanding", 
format!("{null_pct}%_nulls")),
+            |b| {
+                b.iter(|| {
+                    let mut evaluator =
+                        create_evaluator(NthValueKind::Nth, true, Some(-10));
+                    let values_slice = slice::from_ref(&values);
+                    for i in 0..values.len() {
+                        let range = Range {
+                            start: 0,
+                            end: i + 1,
+                        };
+                        black_box(evaluator.evaluate(values_slice, 
&range).unwrap());
+                    }
+                })
+            },
+        );
+
+        // Sliding window benchmarks with 100-row window
+        let window_size: usize = 100;
+
+        group.bench_function(
+            BenchmarkId::new("first_value_sliding_100", 
format!("{null_pct}%_nulls")),
+            |b| {
+                b.iter(|| {
+                    let mut evaluator = create_evaluator(NthValueKind::First, 
true, None);
+                    let values_slice = slice::from_ref(&values);
+                    for i in 0..values.len() {
+                        let start = i.saturating_sub(window_size - 1);
+                        let range = Range { start, end: i + 1 };
+                        black_box(evaluator.evaluate(values_slice, 
&range).unwrap());
+                    }
+                })
+            },
+        );
+
+        group.bench_function(
+            BenchmarkId::new("last_value_sliding_100", 
format!("{null_pct}%_nulls")),
+            |b| {
+                b.iter(|| {
+                    let mut evaluator = create_evaluator(NthValueKind::Last, 
true, None);
+                    let values_slice = slice::from_ref(&values);
+                    for i in 0..values.len() {
+                        let start = i.saturating_sub(window_size - 1);
+                        let range = Range { start, end: i + 1 };
+                        black_box(evaluator.evaluate(values_slice, 
&range).unwrap());
+                    }
+                })
+            },
+        );
+    }
+
+    group.finish();
+
+    // Comparison benchmarks: ignore_nulls vs respect_nulls
+    let mut comparison_group = c.benchmark_group("nth_value_nulls_comparison");
+    let values_with_nulls =
+        Arc::new(create_primitive_array::<Int64Type>(ARRAY_SIZE, 0.5)) as 
ArrayRef;
+
+    // FIRST_VALUE comparison
+    comparison_group.bench_function(
+        BenchmarkId::new("first_value", "ignore_nulls"),
+        |b| {
+            b.iter(|| {
+                let mut evaluator = create_evaluator(NthValueKind::First, 
true, None);
+                let values_slice = slice::from_ref(&values_with_nulls);
+                for i in 0..values_with_nulls.len() {
+                    let range = Range {
+                        start: 0,
+                        end: i + 1,
+                    };
+                    black_box(evaluator.evaluate(values_slice, 
&range).unwrap());
+                }
+            })
+        },
+    );
+
+    comparison_group.bench_function(
+        BenchmarkId::new("first_value", "respect_nulls"),
+        |b| {
+            b.iter(|| {
+                let mut evaluator = create_evaluator(NthValueKind::First, 
false, None);
+                let values_slice = slice::from_ref(&values_with_nulls);
+                for i in 0..values_with_nulls.len() {
+                    let range = Range {
+                        start: 0,
+                        end: i + 1,
+                    };
+                    black_box(evaluator.evaluate(values_slice, 
&range).unwrap());
+                }
+            })
+        },
+    );
+
+    // NTH_VALUE comparison
+    comparison_group.bench_function(
+        BenchmarkId::new("nth_value_10", "ignore_nulls"),
+        |b| {
+            b.iter(|| {
+                let mut evaluator = create_evaluator(NthValueKind::Nth, true, 
Some(10));
+                let values_slice = slice::from_ref(&values_with_nulls);
+                for i in 0..values_with_nulls.len() {
+                    let range = Range {
+                        start: 0,
+                        end: i + 1,
+                    };
+                    black_box(evaluator.evaluate(values_slice, 
&range).unwrap());
+                }
+            })
+        },
+    );
+
+    comparison_group.bench_function(
+        BenchmarkId::new("nth_value_10", "respect_nulls"),
+        |b| {
+            b.iter(|| {
+                let mut evaluator = create_evaluator(NthValueKind::Nth, false, 
Some(10));
+                let values_slice = slice::from_ref(&values_with_nulls);
+                for i in 0..values_with_nulls.len() {
+                    let range = Range {
+                        start: 0,
+                        end: i + 1,
+                    };
+                    black_box(evaluator.evaluate(values_slice, 
&range).unwrap());
+                }
+            })
+        },
+    );
+
+    comparison_group.finish();
+}
+
+criterion_group!(benches, bench_nth_value_ignore_nulls);
+criterion_main!(benches);
diff --git a/datafusion/functions-window/src/nth_value.rs 
b/datafusion/functions-window/src/nth_value.rs
index be08f25ec4..c62f0a9ae4 100644
--- a/datafusion/functions-window/src/nth_value.rs
+++ b/datafusion/functions-window/src/nth_value.rs
@@ -19,6 +19,7 @@
 
 use crate::utils::{get_scalar_value_from_args, get_signed_integer};
 
+use arrow::buffer::NullBuffer;
 use arrow::datatypes::FieldRef;
 use datafusion_common::arrow::array::ArrayRef;
 use datafusion_common::arrow::datatypes::{DataType, Field};
@@ -370,6 +371,33 @@ impl PartitionEvaluator for NthValueEvaluator {
     fn memoize(&mut self, state: &mut WindowAggState) -> Result<()> {
         let out = &state.out_col;
         let size = out.len();
+        if self.ignore_nulls {
+            match self.state.kind {
+                // Prune on first non-null output in case of FIRST_VALUE
+                NthValueKind::First => {
+                    if let Some(nulls) = out.nulls() {
+                        if self.state.finalized_result.is_none() {
+                            if let Some(valid_index) = 
nulls.valid_indices().next() {
+                                let result =
+                                    ScalarValue::try_from_array(out, 
valid_index)?;
+                                self.state.finalized_result = Some(result);
+                            } else {
+                                // The output is empty or all nulls, ignore
+                            }
+                        }
+                        if state.window_frame_range.start < 
state.window_frame_range.end {
+                            state.window_frame_range.start =
+                                state.window_frame_range.end - 1;
+                        }
+                        return Ok(());
+                    } else {
+                        // Fall through to the main case because there are no 
nulls
+                    }
+                }
+                // Do not memoize for other kinds when nulls are ignored
+                NthValueKind::Last | NthValueKind::Nth => return Ok(()),
+            }
+        }
         let mut buffer_size = 1;
         // Decide if we arrived at a final result yet:
         let (is_prunable, is_reverse_direction) = match self.state.kind {
@@ -397,8 +425,7 @@ impl PartitionEvaluator for NthValueEvaluator {
                 }
             }
         };
-        // Do not memoize results when nulls are ignored.
-        if is_prunable && !self.ignore_nulls {
+        if is_prunable {
             if self.state.finalized_result.is_none() && !is_reverse_direction {
                 let result = ScalarValue::try_from_array(out, size - 1)?;
                 self.state.finalized_result = Some(result);
@@ -424,99 +451,90 @@ impl PartitionEvaluator for NthValueEvaluator {
                 // We produce None if the window is empty.
                 return ScalarValue::try_from(arr.data_type());
             }
+            match self.valid_index(arr, range) {
+                Some(index) => ScalarValue::try_from_array(arr, index),
+                None => ScalarValue::try_from(arr.data_type()),
+            }
+        }
+    }
 
-            // If null values exist and need to be ignored, extract the valid 
indices.
-            let valid_indices = if self.ignore_nulls {
-                // Calculate valid indices, inside the window frame boundaries.
-                let slice = arr.slice(range.start, n_range);
-                match slice.nulls() {
-                    Some(nulls) => {
-                        let valid_indices = nulls
-                            .valid_indices()
-                            .map(|idx| {
-                                // Add offset `range.start` to valid indices, 
to point correct index in the original arr.
-                                idx + range.start
-                            })
-                            .collect::<Vec<_>>();
-                        if valid_indices.is_empty() {
-                            // If all values are null, return directly.
-                            return ScalarValue::try_from(arr.data_type());
-                        }
-                        Some(valid_indices)
-                    }
-                    None => None,
-                }
-            } else {
-                None
-            };
-            match self.state.kind {
-                NthValueKind::First => {
-                    if let Some(valid_indices) = &valid_indices {
-                        ScalarValue::try_from_array(arr, valid_indices[0])
+    fn supports_bounded_execution(&self) -> bool {
+        true
+    }
+
+    fn uses_window_frame(&self) -> bool {
+        true
+    }
+}
+
+impl NthValueEvaluator {
+    fn valid_index(&self, array: &ArrayRef, range: &Range<usize>) -> 
Option<usize> {
+        let n_range = range.end - range.start;
+        if self.ignore_nulls {
+            // Calculate valid indices, inside the window frame boundaries.
+            let slice = array.slice(range.start, n_range);
+            if let Some(nulls) = slice.nulls()
+                && nulls.null_count() > 0
+            {
+                return self.valid_index_with_nulls(nulls, range.start);
+            }
+        }
+        // Either no nulls, or nulls are regarded as valid rows
+        match self.state.kind {
+            NthValueKind::First => Some(range.start),
+            NthValueKind::Last => Some(range.end - 1),
+            NthValueKind::Nth => match self.n.cmp(&0) {
+                Ordering::Greater => {
+                    // SQL indices are not 0-based.
+                    let index = (self.n as usize) - 1;
+                    if index >= n_range {
+                        // Outside the range, return NULL:
+                        None
                     } else {
-                        ScalarValue::try_from_array(arr, range.start)
+                        Some(range.start + index)
                     }
                 }
-                NthValueKind::Last => {
-                    if let Some(valid_indices) = &valid_indices {
-                        ScalarValue::try_from_array(
-                            arr,
-                            valid_indices[valid_indices.len() - 1],
-                        )
+                Ordering::Less => {
+                    let reverse_index = (-self.n) as usize;
+                    if n_range < reverse_index {
+                        // Outside the range, return NULL:
+                        None
                     } else {
-                        ScalarValue::try_from_array(arr, range.end - 1)
+                        Some(range.end - reverse_index)
                     }
                 }
-                NthValueKind::Nth => {
-                    match self.n.cmp(&0) {
-                        Ordering::Greater => {
-                            // SQL indices are not 0-based.
-                            let index = (self.n as usize) - 1;
-                            if index >= n_range {
-                                // Outside the range, return NULL:
-                                ScalarValue::try_from(arr.data_type())
-                            } else if let Some(valid_indices) = valid_indices {
-                                if index >= valid_indices.len() {
-                                    return 
ScalarValue::try_from(arr.data_type());
-                                }
-                                ScalarValue::try_from_array(&arr, 
valid_indices[index])
-                            } else {
-                                ScalarValue::try_from_array(arr, range.start + 
index)
-                            }
-                        }
-                        Ordering::Less => {
-                            let reverse_index = (-self.n) as usize;
-                            if n_range < reverse_index {
-                                // Outside the range, return NULL:
-                                ScalarValue::try_from(arr.data_type())
-                            } else if let Some(valid_indices) = valid_indices {
-                                if reverse_index > valid_indices.len() {
-                                    return 
ScalarValue::try_from(arr.data_type());
-                                }
-                                let new_index =
-                                    valid_indices[valid_indices.len() - 
reverse_index];
-                                ScalarValue::try_from_array(&arr, new_index)
-                            } else {
-                                ScalarValue::try_from_array(
-                                    arr,
-                                    range.start + n_range - reverse_index,
-                                )
-                            }
+                Ordering::Equal => None,
+            },
+        }
+    }
+
+    fn valid_index_with_nulls(&self, nulls: &NullBuffer, offset: usize) -> 
Option<usize> {
+        match self.state.kind {
+            NthValueKind::First => nulls.valid_indices().next().map(|idx| idx 
+ offset),
+            NthValueKind::Last => nulls.valid_indices().last().map(|idx| idx + 
offset),
+            NthValueKind::Nth => {
+                match self.n.cmp(&0) {
+                    Ordering::Greater => {
+                        // SQL indices are not 0-based.
+                        let index = (self.n as usize) - 1;
+                        nulls.valid_indices().nth(index).map(|idx| idx + 
offset)
+                    }
+                    Ordering::Less => {
+                        let reverse_index = (-self.n) as usize;
+                        let valid_indices_len = nulls.len() - 
nulls.null_count();
+                        if reverse_index > valid_indices_len {
+                            return None;
                         }
-                        Ordering::Equal => 
ScalarValue::try_from(arr.data_type()),
+                        nulls
+                            .valid_indices()
+                            .nth(valid_indices_len - reverse_index)
+                            .map(|idx| idx + offset)
                     }
+                    Ordering::Equal => None,
                 }
             }
         }
     }
-
-    fn supports_bounded_execution(&self) -> bool {
-        true
-    }
-
-    fn uses_window_frame(&self) -> bool {
-        true
-    }
 }
 
 #[cfg(test)]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to