This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 4ac1428025 Convert `nth_value` to UDAF (#11287)
4ac1428025 is described below

commit 4ac14280250fd7e1fad8f749702a7e7d025c472d
Author: jcsherin <[email protected]>
AuthorDate: Mon Jul 8 23:07:37 2024 +0530

    Convert `nth_value` to UDAF (#11287)
    
    * Copies `NthValueAccumulator` to `functions-aggregate`
    
    * Partial implementation of `AggregateUDFImpl`
    
    Pending methods are:
    - `accumulator`
    - `state_fields`
    - `reverse_expr`
    
    * Implements `accumulator` method
    
    * Retains existing comments verbatim
    
    * Removes unnecessary path prefix
    
    * Implements `reverse_expr` method
    
    * Adds `nullable` field to `NthValue`
    
    * Revert to existing name
    
    * Implements `state_fields` method
    
    * Removes `nth_value` from `physical-expr`
    
    * Adds default
    
    * Exports `nth_value`
    
    * Fixes build error in physical plan roundtrip test
    
    * Minor: formatting
    
    * Parses `N` from input expression
    
    * Fixes build error by using `nth_value_udaf`
    
    * Fixes `reverse_expr` by passing correct `N`
    
    * Update plan with lowercase UDF name
    
    * Updates error message for incorrect no. of arguments
    
    This error message is manually formatted to remain consistent with
    existing error statements. It is not formatted by running:
    ```
    cargo test -p datafusion-sqllogictest --test sqllogictests errors -- 
--complete
    ```
    
    * Fixes nullable "item" in `state_fields`
    
    * Minor: fix formatting after resolving conflicts
    
    * Updates multiple existing plans with lowercase name
    
    * Implements `retract_batch` for window aggregations
    
    * Fixes: regex mismatch for error message in CI
    
    * Revert "Updates multiple existing plans with lowercase name"
    
    This reverts commit 1913efda49e585816286b54b371d4166ac894d1f.
    
    * Revert "Implements `retract_batch` for window aggregations"
    
    This reverts commit 4bb204f6ec8028c4e3313db5af3fabfcdaf7fea8.
    
    * Fixes: use builtin window function instead of udaf
    
    * Revert "Updates error message for incorrect no. of arguments"
    
    This reverts commit fa61ce62dcae6eae6f8e9c9900ebf8cff5023bc0.
    
    * Refactor: renames field and method
    
    * Removes hack for nullability
    
    * Minor: refactors `reverse_expr`
    
    * Minor: removes unncessary path prefix
    
    * Minor: cleanup arguments for creating aggregate expr
    
    * Refactor: extracts `merge_ordered_arrays` to `physical-expr-common`
    
    * Minor: adds todo for configuring nullability
    
    * Retrigger CI
    
    ---------
    
    Co-authored-by: Andrew Lamb <[email protected]>
---
 datafusion-cli/Cargo.lock                          |   1 +
 datafusion/expr/src/aggregate_function.rs          |   7 -
 datafusion/expr/src/type_coercion/aggregates.rs    |   1 -
 datafusion/functions-aggregate/src/lib.rs          |   3 +
 .../src}/nth_value.rs                              | 206 ++++++++++-----------
 datafusion/functions-array/Cargo.toml              |   1 +
 datafusion/functions-array/src/planner.rs          |   5 +-
 .../src/aggregate/merge_arrays.rs                  | 195 +++++++++++++++++++
 .../physical-expr-common/src/aggregate/mod.rs      |   1 +
 .../src/aggregate/array_agg_ordered.rs             | 181 +-----------------
 datafusion/physical-expr/src/aggregate/build_in.rs |  24 +--
 datafusion/physical-expr/src/aggregate/mod.rs      |   1 -
 datafusion/physical-expr/src/expressions/mod.rs    |   1 -
 datafusion/proto/proto/datafusion.proto            |   2 +-
 datafusion/proto/src/generated/pbjson.rs           |   3 -
 datafusion/proto/src/generated/prost.rs            |   7 +-
 datafusion/proto/src/logical_plan/from_proto.rs    |   1 -
 datafusion/proto/src/logical_plan/to_proto.rs      |   4 -
 datafusion/proto/src/physical_plan/to_proto.rs     |   6 +-
 .../proto/tests/cases/roundtrip_physical_plan.rs   |  21 ++-
 datafusion/sql/src/expr/function.rs                |   6 +-
 .../test_files/agg_func_substitute.slt             |  30 +--
 22 files changed, 350 insertions(+), 357 deletions(-)

diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock
index 4fce2ec500..500e731a5b 100644
--- a/datafusion-cli/Cargo.lock
+++ b/datafusion-cli/Cargo.lock
@@ -1319,6 +1319,7 @@ dependencies = [
  "datafusion-execution",
  "datafusion-expr",
  "datafusion-functions",
+ "datafusion-functions-aggregate",
  "itertools",
  "log",
  "paste",
diff --git a/datafusion/expr/src/aggregate_function.rs 
b/datafusion/expr/src/aggregate_function.rs
index 760952d948..23e98714df 100644
--- a/datafusion/expr/src/aggregate_function.rs
+++ b/datafusion/expr/src/aggregate_function.rs
@@ -39,8 +39,6 @@ pub enum AggregateFunction {
     Max,
     /// Aggregation into an array
     ArrayAgg,
-    /// N'th value in a group according to some ordering
-    NthValue,
 }
 
 impl AggregateFunction {
@@ -50,7 +48,6 @@ impl AggregateFunction {
             Min => "MIN",
             Max => "MAX",
             ArrayAgg => "ARRAY_AGG",
-            NthValue => "NTH_VALUE",
         }
     }
 }
@@ -69,7 +66,6 @@ impl FromStr for AggregateFunction {
             "max" => AggregateFunction::Max,
             "min" => AggregateFunction::Min,
             "array_agg" => AggregateFunction::ArrayAgg,
-            "nth_value" => AggregateFunction::NthValue,
             _ => {
                 return plan_err!("There is no built-in function named {name}");
             }
@@ -114,7 +110,6 @@ impl AggregateFunction {
                 coerced_data_types[0].clone(),
                 input_expr_nullable[0],
             )))),
-            AggregateFunction::NthValue => Ok(coerced_data_types[0].clone()),
         }
     }
 
@@ -124,7 +119,6 @@ impl AggregateFunction {
         match self {
             AggregateFunction::Max | AggregateFunction::Min => Ok(true),
             AggregateFunction::ArrayAgg => Ok(false),
-            AggregateFunction::NthValue => Ok(true),
         }
     }
 }
@@ -147,7 +141,6 @@ impl AggregateFunction {
                     .collect::<Vec<_>>();
                 Signature::uniform(1, valid, Volatility::Immutable)
             }
-            AggregateFunction::NthValue => Signature::any(2, 
Volatility::Immutable),
         }
     }
 }
diff --git a/datafusion/expr/src/type_coercion/aggregates.rs 
b/datafusion/expr/src/type_coercion/aggregates.rs
index 0f7464b96b..fbec6e2f80 100644
--- a/datafusion/expr/src/type_coercion/aggregates.rs
+++ b/datafusion/expr/src/type_coercion/aggregates.rs
@@ -101,7 +101,6 @@ pub fn coerce_types(
             // unpack the dictionary to get the value
             get_min_max_result_type(input_types)
         }
-        AggregateFunction::NthValue => Ok(input_types.to_vec()),
     }
 }
 
diff --git a/datafusion/functions-aggregate/src/lib.rs 
b/datafusion/functions-aggregate/src/lib.rs
index fc485a284a..6ae2dfb369 100644
--- a/datafusion/functions-aggregate/src/lib.rs
+++ b/datafusion/functions-aggregate/src/lib.rs
@@ -74,6 +74,7 @@ pub mod average;
 pub mod bit_and_or_xor;
 pub mod bool_and_or;
 pub mod grouping;
+pub mod nth_value;
 pub mod string_agg;
 
 use crate::approx_percentile_cont::approx_percentile_cont_udaf;
@@ -105,6 +106,7 @@ pub mod expr_fn {
     pub use super::first_last::last_value;
     pub use super::grouping::grouping;
     pub use super::median::median;
+    pub use super::nth_value::nth_value;
     pub use super::regr::regr_avgx;
     pub use super::regr::regr_avgy;
     pub use super::regr::regr_count;
@@ -157,6 +159,7 @@ pub fn all_default_aggregate_functions() -> 
Vec<Arc<AggregateUDF>> {
         bool_and_or::bool_or_udaf(),
         average::avg_udaf(),
         grouping::grouping_udaf(),
+        nth_value::nth_value_udaf(),
     ]
 }
 
diff --git a/datafusion/physical-expr/src/aggregate/nth_value.rs 
b/datafusion/functions-aggregate/src/nth_value.rs
similarity index 77%
rename from datafusion/physical-expr/src/aggregate/nth_value.rs
rename to datafusion/functions-aggregate/src/nth_value.rs
index b75ecd1066..6719c673c5 100644
--- a/datafusion/physical-expr/src/aggregate/nth_value.rs
+++ b/datafusion/functions-aggregate/src/nth_value.rs
@@ -22,149 +22,149 @@ use std::any::Any;
 use std::collections::VecDeque;
 use std::sync::Arc;
 
-use crate::aggregate::array_agg_ordered::merge_ordered_arrays;
-use crate::aggregate::utils::{down_cast_any_ref, ordering_fields};
-use crate::expressions::{format_state_name, Literal};
-use crate::{
-    reverse_order_bys, AggregateExpr, LexOrdering, PhysicalExpr, 
PhysicalSortExpr,
-};
-
-use arrow_array::cast::AsArray;
-use arrow_array::{new_empty_array, ArrayRef, StructArray};
+use arrow::array::{new_empty_array, ArrayRef, AsArray, StructArray};
 use arrow_schema::{DataType, Field, Fields};
+
 use datafusion_common::utils::{array_into_list_array_nullable, get_row_at_idx};
-use datafusion_common::{exec_err, internal_err, Result, ScalarValue};
-use datafusion_expr::utils::AggregateOrderSensitivity;
-use datafusion_expr::Accumulator;
+use datafusion_common::{exec_err, internal_err, not_impl_err, Result, 
ScalarValue};
+use datafusion_expr::function::{AccumulatorArgs, StateFieldsArgs};
+use datafusion_expr::utils::format_state_name;
+use datafusion_expr::{
+    Accumulator, AggregateUDF, AggregateUDFImpl, Expr, ReversedUDAF, Signature,
+    Volatility,
+};
+use 
datafusion_physical_expr_common::aggregate::merge_arrays::merge_ordered_arrays;
+use datafusion_physical_expr_common::aggregate::utils::ordering_fields;
+use datafusion_physical_expr_common::sort_expr::{
+    limited_convert_logical_sort_exprs_to_physical, LexOrdering, 
PhysicalSortExpr,
+};
+
+make_udaf_expr_and_func!(
+    NthValueAgg,
+    nth_value,
+    "Returns the nth value in a group of values.",
+    nth_value_udaf
+);
 
 /// Expression for a `NTH_VALUE(... ORDER BY ..., ...)` aggregation. In a multi
 /// partition setting, partial aggregations are computed for every partition,
 /// and then their results are merged.
 #[derive(Debug)]
 pub struct NthValueAgg {
-    /// Column name
-    name: String,
-    /// The `DataType` for the input expression
-    input_data_type: DataType,
-    /// The input expression
-    expr: Arc<dyn PhysicalExpr>,
-    /// The `N` value.
-    n: i64,
-    /// If the input expression can have `NULL`s
-    nullable: bool,
-    /// Ordering data types
-    order_by_data_types: Vec<DataType>,
-    /// Ordering requirement
-    ordering_req: LexOrdering,
+    signature: Signature,
+    /// Determines whether `N` is relative to the beginning or the end
+    /// of the aggregation. When set to `true`, then `N` is from the end.
+    reversed: bool,
 }
 
 impl NthValueAgg {
     /// Create a new `NthValueAgg` aggregate function
-    pub fn new(
-        expr: Arc<dyn PhysicalExpr>,
-        n: i64,
-        name: impl Into<String>,
-        input_data_type: DataType,
-        nullable: bool,
-        order_by_data_types: Vec<DataType>,
-        ordering_req: LexOrdering,
-    ) -> Self {
+    pub fn new() -> Self {
         Self {
-            name: name.into(),
-            input_data_type,
-            expr,
-            n,
-            nullable,
-            order_by_data_types,
-            ordering_req,
+            signature: Signature::any(2, Volatility::Immutable),
+            reversed: false,
         }
     }
+
+    pub fn with_reversed(mut self, reversed: bool) -> Self {
+        self.reversed = reversed;
+        self
+    }
 }
 
-impl AggregateExpr for NthValueAgg {
+impl Default for NthValueAgg {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+impl AggregateUDFImpl for NthValueAgg {
     fn as_any(&self) -> &dyn Any {
         self
     }
 
-    fn field(&self) -> Result<Field> {
-        Ok(Field::new(&self.name, self.input_data_type.clone(), true))
+    fn name(&self) -> &str {
+        "nth_value"
+    }
+
+    fn signature(&self) -> &Signature {
+        &self.signature
     }
 
-    fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
-        Ok(Box::new(NthValueAccumulator::try_new(
-            self.n,
-            &self.input_data_type,
-            &self.order_by_data_types,
-            self.ordering_req.clone(),
-        )?))
+    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
+        Ok(arg_types[0].clone())
     }
 
-    fn state_fields(&self) -> Result<Vec<Field>> {
+    fn accumulator(&self, acc_args: AccumulatorArgs) -> Result<Box<dyn 
Accumulator>> {
+        let n = match acc_args.input_exprs[1] {
+            Expr::Literal(ScalarValue::Int64(Some(value))) => {
+                if self.reversed {
+                    Ok(-value)
+                } else {
+                    Ok(value)
+                }
+            }
+            _ => not_impl_err!(
+                "{} not supported for n: {}",
+                self.name(),
+                &acc_args.input_exprs[1]
+            ),
+        }?;
+
+        let ordering_req = limited_convert_logical_sort_exprs_to_physical(
+            acc_args.sort_exprs,
+            acc_args.schema,
+        )?;
+
+        let ordering_dtypes = ordering_req
+            .iter()
+            .map(|e| e.expr.data_type(acc_args.schema))
+            .collect::<Result<Vec<_>>>()?;
+
+        NthValueAccumulator::try_new(
+            n,
+            acc_args.input_type,
+            &ordering_dtypes,
+            ordering_req,
+        )
+        .map(|acc| Box::new(acc) as _)
+    }
+
+    fn state_fields(&self, args: StateFieldsArgs) -> Result<Vec<Field>> {
         let mut fields = vec![Field::new_list(
-            format_state_name(&self.name, "nth_value"),
-            Field::new("item", self.input_data_type.clone(), true),
-            self.nullable, // This should be the same as field()
+            format_state_name(self.name(), "nth_value"),
+            // TODO: The nullability of the list element should be 
configurable.
+            // The hard-coded `true` should be changed once the field for
+            // nullability is added to `StateFieldArgs` struct.
+            // See: https://github.com/apache/datafusion/pull/11063
+            Field::new("item", args.input_type.clone(), true),
+            false,
         )];
-        if !self.ordering_req.is_empty() {
-            let orderings =
-                ordering_fields(&self.ordering_req, &self.order_by_data_types);
+        let orderings = args.ordering_fields.to_vec();
+        if !orderings.is_empty() {
             fields.push(Field::new_list(
-                format_state_name(&self.name, "nth_value_orderings"),
+                format_state_name(self.name(), "nth_value_orderings"),
                 Field::new("item", DataType::Struct(Fields::from(orderings)), 
true),
-                self.nullable,
+                false,
             ));
         }
         Ok(fields)
     }
 
-    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
-        let n = Arc::new(Literal::new(ScalarValue::Int64(Some(self.n)))) as _;
-        vec![Arc::clone(&self.expr), n]
+    fn aliases(&self) -> &[String] {
+        &[]
     }
 
-    fn order_bys(&self) -> Option<&[PhysicalSortExpr]> {
-        (!self.ordering_req.is_empty()).then_some(&self.ordering_req)
-    }
-
-    fn order_sensitivity(&self) -> AggregateOrderSensitivity {
-        AggregateOrderSensitivity::HardRequirement
-    }
-
-    fn name(&self) -> &str {
-        &self.name
-    }
-
-    fn reverse_expr(&self) -> Option<Arc<dyn AggregateExpr>> {
-        Some(Arc::new(Self {
-            name: self.name.to_string(),
-            input_data_type: self.input_data_type.clone(),
-            expr: Arc::clone(&self.expr),
-            // index should be from the opposite side
-            n: -self.n,
-            nullable: self.nullable,
-            order_by_data_types: self.order_by_data_types.clone(),
-            // reverse requirement
-            ordering_req: reverse_order_bys(&self.ordering_req),
-        }) as _)
-    }
-}
-
-impl PartialEq<dyn Any> for NthValueAgg {
-    fn eq(&self, other: &dyn Any) -> bool {
-        down_cast_any_ref(other)
-            .downcast_ref::<Self>()
-            .map(|x| {
-                self.name == x.name
-                    && self.input_data_type == x.input_data_type
-                    && self.order_by_data_types == x.order_by_data_types
-                    && self.expr.eq(&x.expr)
-            })
-            .unwrap_or(false)
+    fn reverse_expr(&self) -> ReversedUDAF {
+        ReversedUDAF::Reversed(Arc::from(AggregateUDF::from(
+            Self::new().with_reversed(!self.reversed),
+        )))
     }
 }
 
 #[derive(Debug)]
-pub(crate) struct NthValueAccumulator {
+pub struct NthValueAccumulator {
+    /// The `N` value.
     n: i64,
     /// Stores entries in the `NTH_VALUE` result.
     values: VecDeque<ScalarValue>,
diff --git a/datafusion/functions-array/Cargo.toml 
b/datafusion/functions-array/Cargo.toml
index eb1ef9e03f..73c5b9114a 100644
--- a/datafusion/functions-array/Cargo.toml
+++ b/datafusion/functions-array/Cargo.toml
@@ -49,6 +49,7 @@ datafusion-common = { workspace = true }
 datafusion-execution = { workspace = true }
 datafusion-expr = { workspace = true }
 datafusion-functions = { workspace = true }
+datafusion-functions-aggregate = { workspace = true }
 itertools = { version = "0.12", features = ["use_std"] }
 log = { workspace = true }
 paste = "1.0.14"
diff --git a/datafusion/functions-array/src/planner.rs 
b/datafusion/functions-array/src/planner.rs
index cfb3e5ed07..01853fb569 100644
--- a/datafusion/functions-array/src/planner.rs
+++ b/datafusion/functions-array/src/planner.rs
@@ -23,6 +23,7 @@ use datafusion_expr::{
     sqlparser, AggregateFunction, Expr, ExprSchemable, GetFieldAccess,
 };
 use datafusion_functions::expr_fn::get_field;
+use datafusion_functions_aggregate::nth_value::nth_value_udaf;
 
 use crate::{
     array_has::array_has_all,
@@ -119,8 +120,8 @@ impl UserDefinedSQLPlanner for FieldAccessPlanner {
                     // Special case for array_agg(expr)[index] to 
NTH_VALUE(expr, index)
                     Expr::AggregateFunction(agg_func) if 
is_array_agg(&agg_func) => {
                         Ok(PlannerResult::Planned(Expr::AggregateFunction(
-                            datafusion_expr::expr::AggregateFunction::new(
-                                AggregateFunction::NthValue,
+                            datafusion_expr::expr::AggregateFunction::new_udf(
+                                nth_value_udaf(),
                                 agg_func
                                     .args
                                     .into_iter()
diff --git a/datafusion/physical-expr-common/src/aggregate/merge_arrays.rs 
b/datafusion/physical-expr-common/src/aggregate/merge_arrays.rs
new file mode 100644
index 0000000000..544bdc1828
--- /dev/null
+++ b/datafusion/physical-expr-common/src/aggregate/merge_arrays.rs
@@ -0,0 +1,195 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::compute::SortOptions;
+use datafusion_common::utils::compare_rows;
+use datafusion_common::{exec_err, ScalarValue};
+use std::cmp::Ordering;
+use std::collections::{BinaryHeap, VecDeque};
+
+/// This is a wrapper struct to be able to correctly merge `ARRAY_AGG` data 
from
+/// multiple partitions using `BinaryHeap`. When used inside `BinaryHeap`, this
+/// struct returns smallest `CustomElement`, where smallest is determined by
+/// `ordering` values (`Vec<ScalarValue>`) according to `sort_options`.
+#[derive(Debug, PartialEq, Eq)]
+struct CustomElement<'a> {
+    /// Stores the partition this entry came from
+    branch_idx: usize,
+    /// Values to merge
+    value: ScalarValue,
+    // Comparison "key"
+    ordering: Vec<ScalarValue>,
+    /// Options defining the ordering semantics
+    sort_options: &'a [SortOptions],
+}
+
+impl<'a> CustomElement<'a> {
+    fn new(
+        branch_idx: usize,
+        value: ScalarValue,
+        ordering: Vec<ScalarValue>,
+        sort_options: &'a [SortOptions],
+    ) -> Self {
+        Self {
+            branch_idx,
+            value,
+            ordering,
+            sort_options,
+        }
+    }
+
+    fn ordering(
+        &self,
+        current: &[ScalarValue],
+        target: &[ScalarValue],
+    ) -> datafusion_common::Result<Ordering> {
+        // Calculate ordering according to `sort_options`
+        compare_rows(current, target, self.sort_options)
+    }
+}
+
+// Overwrite ordering implementation such that
+// - `self.ordering` values are used for comparison,
+// - When used inside `BinaryHeap` it is a min-heap.
+impl<'a> Ord for CustomElement<'a> {
+    fn cmp(&self, other: &Self) -> Ordering {
+        // Compares according to custom ordering
+        self.ordering(&self.ordering, &other.ordering)
+            // Convert max heap to min heap
+            .map(|ordering| ordering.reverse())
+            // This function return error, when `self.ordering` and 
`other.ordering`
+            // have different types (such as one is `ScalarValue::Int64`, 
other is `ScalarValue::Float32`)
+            // Here this case won't happen, because data from each partition 
will have same type
+            .unwrap()
+    }
+}
+
+impl<'a> PartialOrd for CustomElement<'a> {
+    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
+        Some(self.cmp(other))
+    }
+}
+
+/// This functions merges `values` array (`&[Vec<ScalarValue>]`) into single 
array `Vec<ScalarValue>`
+/// Merging done according to ordering values stored inside `ordering_values` 
(`&[Vec<Vec<ScalarValue>>]`)
+/// Inner `Vec<ScalarValue>` in the `ordering_values` can be thought as 
ordering information for the
+/// each `ScalarValue` in the `values` array.
+/// Desired ordering specified by `sort_options` argument (Should have same 
size with inner `Vec<ScalarValue>`
+/// of the `ordering_values` array).
+///
+/// As an example
+/// values can be \[
+///      \[1, 2, 3, 4, 5\],
+///      \[1, 2, 3, 4\],
+///      \[1, 2, 3, 4, 5, 6\],
+/// \]
+/// In this case we will be merging three arrays (doesn't have to be same size)
+/// and produce a merged array with size 15 (sum of 5+4+6)
+/// Merging will be done according to ordering at `ordering_values` vector.
+/// As an example `ordering_values` can be [
+///      \[(1, a), (2, b), (3, b), (4, a), (5, b) \],
+///      \[(1, a), (2, b), (3, b), (4, a) \],
+///      \[(1, b), (2, c), (3, d), (4, e), (5, a), (6, b) \],
+/// ]
+/// For each ScalarValue in the `values` we have a corresponding 
`Vec<ScalarValue>` (like timestamp of it)
+/// for the example above `sort_options` will have size two, that defines 
ordering requirement of the merge.
+/// Inner `Vec<ScalarValue>`s of the `ordering_values` will be compared 
according `sort_options` (Their sizes should match)
+pub fn merge_ordered_arrays(
+    // We will merge values into single `Vec<ScalarValue>`.
+    values: &mut [VecDeque<ScalarValue>],
+    // `values` will be merged according to `ordering_values`.
+    // Inner `Vec<ScalarValue>` can be thought as ordering information for the
+    // each `ScalarValue` in the values`.
+    ordering_values: &mut [VecDeque<Vec<ScalarValue>>],
+    // Defines according to which ordering comparisons should be done.
+    sort_options: &[SortOptions],
+) -> datafusion_common::Result<(Vec<ScalarValue>, Vec<Vec<ScalarValue>>)> {
+    // Keep track the most recent data of each branch, in binary heap data 
structure.
+    let mut heap = BinaryHeap::<CustomElement>::new();
+
+    if values.len() != ordering_values.len()
+        || values
+            .iter()
+            .zip(ordering_values.iter())
+            .any(|(vals, ordering_vals)| vals.len() != ordering_vals.len())
+    {
+        return exec_err!(
+            "Expects values arguments and/or ordering_values arguments to have 
same size"
+        );
+    }
+    let n_branch = values.len();
+    let mut merged_values = vec![];
+    let mut merged_orderings = vec![];
+    // Continue iterating the loop until consuming data of all branches.
+    loop {
+        let minimum = if let Some(minimum) = heap.pop() {
+            minimum
+        } else {
+            // Heap is empty, fill it with the next entries from each branch.
+            for branch_idx in 0..n_branch {
+                if let Some(orderings) = 
ordering_values[branch_idx].pop_front() {
+                    // Their size should be same, we can safely .unwrap here.
+                    let value = values[branch_idx].pop_front().unwrap();
+                    // Push the next element to the heap:
+                    heap.push(CustomElement::new(
+                        branch_idx,
+                        value,
+                        orderings,
+                        sort_options,
+                    ));
+                }
+                // If None, we consumed this branch, skip it.
+            }
+
+            // Now we have filled the heap, get the largest entry (this will be
+            // the next element in merge).
+            if let Some(minimum) = heap.pop() {
+                minimum
+            } else {
+                // Heap is empty, this means that all indices are same with
+                // `end_indices`. We have consumed all of the branches, merge
+                // is completed, exit from the loop:
+                break;
+            }
+        };
+        let CustomElement {
+            branch_idx,
+            value,
+            ordering,
+            ..
+        } = minimum;
+        // Add minimum value in the heap to the result
+        merged_values.push(value);
+        merged_orderings.push(ordering);
+
+        // If there is an available entry, push next entry in the most
+        // recently consumed branch to the heap.
+        if let Some(orderings) = ordering_values[branch_idx].pop_front() {
+            // Their size should be same, we can safely .unwrap here.
+            let value = values[branch_idx].pop_front().unwrap();
+            // Push the next element to the heap:
+            heap.push(CustomElement::new(
+                branch_idx,
+                value,
+                orderings,
+                sort_options,
+            ));
+        }
+    }
+
+    Ok((merged_values, merged_orderings))
+}
diff --git a/datafusion/physical-expr-common/src/aggregate/mod.rs 
b/datafusion/physical-expr-common/src/aggregate/mod.rs
index cd309b7f7d..35666f199a 100644
--- a/datafusion/physical-expr-common/src/aggregate/mod.rs
+++ b/datafusion/physical-expr-common/src/aggregate/mod.rs
@@ -17,6 +17,7 @@
 
 pub mod count_distinct;
 pub mod groups_accumulator;
+pub mod merge_arrays;
 pub mod stats;
 pub mod tdigest;
 pub mod utils;
diff --git a/datafusion/physical-expr/src/aggregate/array_agg_ordered.rs 
b/datafusion/physical-expr/src/aggregate/array_agg_ordered.rs
index 3b122fe9f8..a64d97637c 100644
--- a/datafusion/physical-expr/src/aggregate/array_agg_ordered.rs
+++ b/datafusion/physical-expr/src/aggregate/array_agg_ordered.rs
@@ -19,8 +19,7 @@
 //! that can evaluated at runtime during query execution
 
 use std::any::Any;
-use std::cmp::Ordering;
-use std::collections::{BinaryHeap, VecDeque};
+use std::collections::VecDeque;
 use std::fmt::Debug;
 use std::sync::Arc;
 
@@ -33,11 +32,12 @@ use crate::{
 use arrow::datatypes::{DataType, Field};
 use arrow_array::cast::AsArray;
 use arrow_array::{new_empty_array, Array, ArrayRef, StructArray};
-use arrow_schema::{Fields, SortOptions};
-use datafusion_common::utils::{array_into_list_array, compare_rows, 
get_row_at_idx};
+use arrow_schema::Fields;
+use datafusion_common::utils::{array_into_list_array, get_row_at_idx};
 use datafusion_common::{exec_err, Result, ScalarValue};
 use datafusion_expr::utils::AggregateOrderSensitivity;
 use datafusion_expr::Accumulator;
+use 
datafusion_physical_expr_common::aggregate::merge_arrays::merge_ordered_arrays;
 
 /// Expression for a `ARRAY_AGG(... ORDER BY ..., ...)` aggregation. In a multi
 /// partition setting, partial aggregations are computed for every partition,
@@ -384,179 +384,6 @@ impl OrderSensitiveArrayAggAccumulator {
     }
 }
 
-/// This is a wrapper struct to be able to correctly merge `ARRAY_AGG` data 
from
-/// multiple partitions using `BinaryHeap`. When used inside `BinaryHeap`, this
-/// struct returns smallest `CustomElement`, where smallest is determined by
-/// `ordering` values (`Vec<ScalarValue>`) according to `sort_options`.
-#[derive(Debug, PartialEq, Eq)]
-struct CustomElement<'a> {
-    /// Stores the partition this entry came from
-    branch_idx: usize,
-    /// Values to merge
-    value: ScalarValue,
-    // Comparison "key"
-    ordering: Vec<ScalarValue>,
-    /// Options defining the ordering semantics
-    sort_options: &'a [SortOptions],
-}
-
-impl<'a> CustomElement<'a> {
-    fn new(
-        branch_idx: usize,
-        value: ScalarValue,
-        ordering: Vec<ScalarValue>,
-        sort_options: &'a [SortOptions],
-    ) -> Self {
-        Self {
-            branch_idx,
-            value,
-            ordering,
-            sort_options,
-        }
-    }
-
-    fn ordering(
-        &self,
-        current: &[ScalarValue],
-        target: &[ScalarValue],
-    ) -> Result<Ordering> {
-        // Calculate ordering according to `sort_options`
-        compare_rows(current, target, self.sort_options)
-    }
-}
-
-// Overwrite ordering implementation such that
-// - `self.ordering` values are used for comparison,
-// - When used inside `BinaryHeap` it is a min-heap.
-impl<'a> Ord for CustomElement<'a> {
-    fn cmp(&self, other: &Self) -> Ordering {
-        // Compares according to custom ordering
-        self.ordering(&self.ordering, &other.ordering)
-            // Convert max heap to min heap
-            .map(|ordering| ordering.reverse())
-            // This function return error, when `self.ordering` and 
`other.ordering`
-            // have different types (such as one is `ScalarValue::Int64`, 
other is `ScalarValue::Float32`)
-            // Here this case won't happen, because data from each partition 
will have same type
-            .unwrap()
-    }
-}
-
-impl<'a> PartialOrd for CustomElement<'a> {
-    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
-        Some(self.cmp(other))
-    }
-}
-
-/// This functions merges `values` array (`&[Vec<ScalarValue>]`) into single 
array `Vec<ScalarValue>`
-/// Merging done according to ordering values stored inside `ordering_values` 
(`&[Vec<Vec<ScalarValue>>]`)
-/// Inner `Vec<ScalarValue>` in the `ordering_values` can be thought as 
ordering information for the
-/// each `ScalarValue` in the `values` array.
-/// Desired ordering specified by `sort_options` argument (Should have same 
size with inner `Vec<ScalarValue>`
-/// of the `ordering_values` array).
-///
-/// As an example
-/// values can be \[
-///      \[1, 2, 3, 4, 5\],
-///      \[1, 2, 3, 4\],
-///      \[1, 2, 3, 4, 5, 6\],
-/// \]
-/// In this case we will be merging three arrays (doesn't have to be same size)
-/// and produce a merged array with size 15 (sum of 5+4+6)
-/// Merging will be done according to ordering at `ordering_values` vector.
-/// As an example `ordering_values` can be [
-///      \[(1, a), (2, b), (3, b), (4, a), (5, b) \],
-///      \[(1, a), (2, b), (3, b), (4, a) \],
-///      \[(1, b), (2, c), (3, d), (4, e), (5, a), (6, b) \],
-/// ]
-/// For each ScalarValue in the `values` we have a corresponding 
`Vec<ScalarValue>` (like timestamp of it)
-/// for the example above `sort_options` will have size two, that defines 
ordering requirement of the merge.
-/// Inner `Vec<ScalarValue>`s of the `ordering_values` will be compared 
according `sort_options` (Their sizes should match)
-pub(crate) fn merge_ordered_arrays(
-    // We will merge values into single `Vec<ScalarValue>`.
-    values: &mut [VecDeque<ScalarValue>],
-    // `values` will be merged according to `ordering_values`.
-    // Inner `Vec<ScalarValue>` can be thought as ordering information for the
-    // each `ScalarValue` in the values`.
-    ordering_values: &mut [VecDeque<Vec<ScalarValue>>],
-    // Defines according to which ordering comparisons should be done.
-    sort_options: &[SortOptions],
-) -> Result<(Vec<ScalarValue>, Vec<Vec<ScalarValue>>)> {
-    // Keep track the most recent data of each branch, in binary heap data 
structure.
-    let mut heap = BinaryHeap::<CustomElement>::new();
-
-    if values.len() != ordering_values.len()
-        || values
-            .iter()
-            .zip(ordering_values.iter())
-            .any(|(vals, ordering_vals)| vals.len() != ordering_vals.len())
-    {
-        return exec_err!(
-            "Expects values arguments and/or ordering_values arguments to have 
same size"
-        );
-    }
-    let n_branch = values.len();
-    let mut merged_values = vec![];
-    let mut merged_orderings = vec![];
-    // Continue iterating the loop until consuming data of all branches.
-    loop {
-        let minimum = if let Some(minimum) = heap.pop() {
-            minimum
-        } else {
-            // Heap is empty, fill it with the next entries from each branch.
-            for branch_idx in 0..n_branch {
-                if let Some(orderings) = 
ordering_values[branch_idx].pop_front() {
-                    // Their size should be same, we can safely .unwrap here.
-                    let value = values[branch_idx].pop_front().unwrap();
-                    // Push the next element to the heap:
-                    heap.push(CustomElement::new(
-                        branch_idx,
-                        value,
-                        orderings,
-                        sort_options,
-                    ));
-                }
-                // If None, we consumed this branch, skip it.
-            }
-
-            // Now we have filled the heap, get the largest entry (this will be
-            // the next element in merge).
-            if let Some(minimum) = heap.pop() {
-                minimum
-            } else {
-                // Heap is empty, this means that all indices are same with
-                // `end_indices`. We have consumed all of the branches, merge
-                // is completed, exit from the loop:
-                break;
-            }
-        };
-        let CustomElement {
-            branch_idx,
-            value,
-            ordering,
-            ..
-        } = minimum;
-        // Add minimum value in the heap to the result
-        merged_values.push(value);
-        merged_orderings.push(ordering);
-
-        // If there is an available entry, push next entry in the most
-        // recently consumed branch to the heap.
-        if let Some(orderings) = ordering_values[branch_idx].pop_front() {
-            // Their size should be same, we can safely .unwrap here.
-            let value = values[branch_idx].pop_front().unwrap();
-            // Push the next element to the heap:
-            heap.push(CustomElement::new(
-                branch_idx,
-                value,
-                orderings,
-                sort_options,
-            ));
-        }
-    }
-
-    Ok((merged_values, merged_orderings))
-}
-
 #[cfg(test)]
 mod tests {
     use std::collections::VecDeque;
diff --git a/datafusion/physical-expr/src/aggregate/build_in.rs 
b/datafusion/physical-expr/src/aggregate/build_in.rs
index 1eadf7247f..d4cd3d51d1 100644
--- a/datafusion/physical-expr/src/aggregate/build_in.rs
+++ b/datafusion/physical-expr/src/aggregate/build_in.rs
@@ -30,10 +30,10 @@ use std::sync::Arc;
 
 use arrow::datatypes::Schema;
 
-use datafusion_common::{exec_err, not_impl_err, Result};
+use datafusion_common::{not_impl_err, Result};
 use datafusion_expr::AggregateFunction;
 
-use crate::expressions::{self, Literal};
+use crate::expressions::{self};
 use crate::{AggregateExpr, PhysicalExpr, PhysicalSortExpr};
 
 /// Create a physical aggregation expression.
@@ -102,26 +102,6 @@ pub fn create_aggregate_expr(
             name,
             data_type,
         )),
-        (AggregateFunction::NthValue, _) => {
-            let expr = &input_phy_exprs[0];
-            let Some(n) = input_phy_exprs[1]
-                .as_any()
-                .downcast_ref::<Literal>()
-                .map(|literal| literal.value())
-            else {
-                return exec_err!("Second argument of NTH_VALUE needs to be a 
literal");
-            };
-            let nullable = expr.nullable(input_schema)?;
-            Arc::new(expressions::NthValueAgg::new(
-                Arc::clone(expr),
-                n.clone().try_into()?,
-                name,
-                input_phy_types[0].clone(),
-                nullable,
-                ordering_types,
-                ordering_req.to_vec(),
-            ))
-        }
     })
 }
 
diff --git a/datafusion/physical-expr/src/aggregate/mod.rs 
b/datafusion/physical-expr/src/aggregate/mod.rs
index f0de7446f6..b9d803900f 100644
--- a/datafusion/physical-expr/src/aggregate/mod.rs
+++ b/datafusion/physical-expr/src/aggregate/mod.rs
@@ -20,7 +20,6 @@ pub use 
datafusion_physical_expr_common::aggregate::AggregateExpr;
 pub(crate) mod array_agg;
 pub(crate) mod array_agg_distinct;
 pub(crate) mod array_agg_ordered;
-pub(crate) mod nth_value;
 #[macro_use]
 pub(crate) mod min_max;
 pub(crate) mod groups_accumulator;
diff --git a/datafusion/physical-expr/src/expressions/mod.rs 
b/datafusion/physical-expr/src/expressions/mod.rs
index 1f2c955ad0..7d8f12091f 100644
--- a/datafusion/physical-expr/src/expressions/mod.rs
+++ b/datafusion/physical-expr/src/expressions/mod.rs
@@ -39,7 +39,6 @@ pub use 
crate::aggregate::array_agg_distinct::DistinctArrayAgg;
 pub use crate::aggregate::array_agg_ordered::OrderSensitiveArrayAgg;
 pub use crate::aggregate::build_in::create_aggregate_expr;
 pub use crate::aggregate::min_max::{Max, MaxAccumulator, Min, MinAccumulator};
-pub use crate::aggregate::nth_value::NthValueAgg;
 pub use crate::aggregate::stats::StatsType;
 pub use crate::window::cume_dist::{cume_dist, CumeDist};
 pub use crate::window::lead_lag::{lag, lead, WindowShift};
diff --git a/datafusion/proto/proto/datafusion.proto 
b/datafusion/proto/proto/datafusion.proto
index ce6c0c53c3..345765b08b 100644
--- a/datafusion/proto/proto/datafusion.proto
+++ b/datafusion/proto/proto/datafusion.proto
@@ -500,7 +500,7 @@ enum AggregateFunction {
   // REGR_SYY = 33;
   // REGR_SXY = 34;
   // STRING_AGG = 35;
-  NTH_VALUE_AGG = 36;
+  // NTH_VALUE_AGG = 36;
 }
 
 message AggregateExprNode {
diff --git a/datafusion/proto/src/generated/pbjson.rs 
b/datafusion/proto/src/generated/pbjson.rs
index 347654e52b..905f0d9849 100644
--- a/datafusion/proto/src/generated/pbjson.rs
+++ b/datafusion/proto/src/generated/pbjson.rs
@@ -535,7 +535,6 @@ impl serde::Serialize for AggregateFunction {
             Self::Min => "MIN",
             Self::Max => "MAX",
             Self::ArrayAgg => "ARRAY_AGG",
-            Self::NthValueAgg => "NTH_VALUE_AGG",
         };
         serializer.serialize_str(variant)
     }
@@ -550,7 +549,6 @@ impl<'de> serde::Deserialize<'de> for AggregateFunction {
             "MIN",
             "MAX",
             "ARRAY_AGG",
-            "NTH_VALUE_AGG",
         ];
 
         struct GeneratedVisitor;
@@ -594,7 +592,6 @@ impl<'de> serde::Deserialize<'de> for AggregateFunction {
                     "MIN" => Ok(AggregateFunction::Min),
                     "MAX" => Ok(AggregateFunction::Max),
                     "ARRAY_AGG" => Ok(AggregateFunction::ArrayAgg),
-                    "NTH_VALUE_AGG" => Ok(AggregateFunction::NthValueAgg),
                     _ => Err(serde::de::Error::unknown_variant(value, FIELDS)),
                 }
             }
diff --git a/datafusion/proto/src/generated/prost.rs 
b/datafusion/proto/src/generated/prost.rs
index c74f172482..b16d26ee6e 100644
--- a/datafusion/proto/src/generated/prost.rs
+++ b/datafusion/proto/src/generated/prost.rs
@@ -1924,7 +1924,7 @@ pub enum AggregateFunction {
     /// AVG = 3;
     /// COUNT = 4;
     /// APPROX_DISTINCT = 5;
-    ArrayAgg = 6,
+    ///
     /// VARIANCE = 7;
     /// VARIANCE_POP = 8;
     /// COVARIANCE = 9;
@@ -1952,7 +1952,8 @@ pub enum AggregateFunction {
     /// REGR_SYY = 33;
     /// REGR_SXY = 34;
     /// STRING_AGG = 35;
-    NthValueAgg = 36,
+    /// NTH_VALUE_AGG = 36;
+    ArrayAgg = 6,
 }
 impl AggregateFunction {
     /// String value of the enum field names used in the ProtoBuf definition.
@@ -1964,7 +1965,6 @@ impl AggregateFunction {
             AggregateFunction::Min => "MIN",
             AggregateFunction::Max => "MAX",
             AggregateFunction::ArrayAgg => "ARRAY_AGG",
-            AggregateFunction::NthValueAgg => "NTH_VALUE_AGG",
         }
     }
     /// Creates an enum from field names used in the ProtoBuf definition.
@@ -1973,7 +1973,6 @@ impl AggregateFunction {
             "MIN" => Some(Self::Min),
             "MAX" => Some(Self::Max),
             "ARRAY_AGG" => Some(Self::ArrayAgg),
-            "NTH_VALUE_AGG" => Some(Self::NthValueAgg),
             _ => None,
         }
     }
diff --git a/datafusion/proto/src/logical_plan/from_proto.rs 
b/datafusion/proto/src/logical_plan/from_proto.rs
index f4fb692804..a58af8afdd 100644
--- a/datafusion/proto/src/logical_plan/from_proto.rs
+++ b/datafusion/proto/src/logical_plan/from_proto.rs
@@ -145,7 +145,6 @@ impl From<protobuf::AggregateFunction> for 
AggregateFunction {
             protobuf::AggregateFunction::Min => Self::Min,
             protobuf::AggregateFunction::Max => Self::Max,
             protobuf::AggregateFunction::ArrayAgg => Self::ArrayAgg,
-            protobuf::AggregateFunction::NthValueAgg => Self::NthValue,
         }
     }
 }
diff --git a/datafusion/proto/src/logical_plan/to_proto.rs 
b/datafusion/proto/src/logical_plan/to_proto.rs
index 7570040a1d..d8f8ea002b 100644
--- a/datafusion/proto/src/logical_plan/to_proto.rs
+++ b/datafusion/proto/src/logical_plan/to_proto.rs
@@ -117,7 +117,6 @@ impl From<&AggregateFunction> for 
protobuf::AggregateFunction {
             AggregateFunction::Min => Self::Min,
             AggregateFunction::Max => Self::Max,
             AggregateFunction::ArrayAgg => Self::ArrayAgg,
-            AggregateFunction::NthValue => Self::NthValueAgg,
         }
     }
 }
@@ -377,9 +376,6 @@ pub fn serialize_expr(
                     AggregateFunction::ArrayAgg => 
protobuf::AggregateFunction::ArrayAgg,
                     AggregateFunction::Min => protobuf::AggregateFunction::Min,
                     AggregateFunction::Max => protobuf::AggregateFunction::Max,
-                    AggregateFunction::NthValue => {
-                        protobuf::AggregateFunction::NthValueAgg
-                    }
                 };
 
                 let aggregate_expr = protobuf::AggregateExprNode {
diff --git a/datafusion/proto/src/physical_plan/to_proto.rs 
b/datafusion/proto/src/physical_plan/to_proto.rs
index 23cdc666e7..5e982ad2af 100644
--- a/datafusion/proto/src/physical_plan/to_proto.rs
+++ b/datafusion/proto/src/physical_plan/to_proto.rs
@@ -25,8 +25,8 @@ use datafusion::physical_expr::{PhysicalSortExpr, 
ScalarFunctionExpr};
 use datafusion::physical_plan::expressions::{
     ArrayAgg, BinaryExpr, CaseExpr, CastExpr, Column, CumeDist, 
DistinctArrayAgg,
     InListExpr, IsNotNullExpr, IsNullExpr, Literal, Max, Min, NegativeExpr, 
NotExpr,
-    NthValue, NthValueAgg, Ntile, OrderSensitiveArrayAgg, Rank, RankType, 
RowNumber,
-    TryCastExpr, WindowShift,
+    NthValue, Ntile, OrderSensitiveArrayAgg, Rank, RankType, RowNumber, 
TryCastExpr,
+    WindowShift,
 };
 use datafusion::physical_plan::udaf::AggregateFunctionExpr;
 use datafusion::physical_plan::windows::{BuiltInWindowExpr, 
PlainAggregateWindowExpr};
@@ -255,8 +255,6 @@ fn aggr_expr_to_aggr_fn(expr: &dyn AggregateExpr) -> 
Result<AggrFn> {
         protobuf::AggregateFunction::Min
     } else if aggr_expr.downcast_ref::<Max>().is_some() {
         protobuf::AggregateFunction::Max
-    } else if aggr_expr.downcast_ref::<NthValueAgg>().is_some() {
-        protobuf::AggregateFunction::NthValueAgg
     } else {
         return not_impl_err!("Aggregate function not supported: {expr:?}");
     };
diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs 
b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
index 106247b2d4..d8d85ace1a 100644
--- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
+++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
@@ -38,7 +38,7 @@ use datafusion::datasource::physical_plan::{
 };
 use datafusion::execution::FunctionRegistry;
 use datafusion::logical_expr::{create_udf, JoinType, Operator, Volatility};
-use datafusion::physical_expr::expressions::{Max, NthValueAgg};
+use datafusion::physical_expr::expressions::Max;
 use datafusion::physical_expr::window::SlidingAggregateWindowExpr;
 use datafusion::physical_expr::{PhysicalSortRequirement, ScalarFunctionExpr};
 use datafusion::physical_plan::aggregates::{
@@ -81,6 +81,7 @@ use datafusion_expr::{
     ScalarUDFImpl, Signature, SimpleAggregateUDF, WindowFrame, 
WindowFrameBound,
 };
 use datafusion_functions_aggregate::average::avg_udaf;
+use datafusion_functions_aggregate::nth_value::nth_value_udaf;
 use datafusion_functions_aggregate::string_agg::StringAgg;
 use datafusion_proto::physical_plan::{
     AsExecutionPlan, DefaultPhysicalExtensionCodec, PhysicalExtensionCodec,
@@ -362,15 +363,17 @@ fn rountrip_aggregate() -> Result<()> {
             false,
         )?],
         // NTH_VALUE
-        vec![Arc::new(NthValueAgg::new(
-            col("b", &schema)?,
-            1,
-            "NTH_VALUE(b, 1)".to_string(),
-            DataType::Int64,
+        vec![udaf::create_aggregate_expr(
+            &nth_value_udaf(),
+            &[col("b", &schema)?, lit(1u64)],
+            &[],
+            &[],
+            &[],
+            &schema,
+            "NTH_VALUE(b, 1)",
             false,
-            Vec::new(),
-            Vec::new(),
-        ))],
+            false,
+        )?],
         // STRING_AGG
         vec![udaf::create_aggregate_expr(
             &AggregateUDF::new_from_impl(StringAgg::new()),
diff --git a/datafusion/sql/src/expr/function.rs 
b/datafusion/sql/src/expr/function.rs
index ea460cb3ef..d9ddf57eb1 100644
--- a/datafusion/sql/src/expr/function.rs
+++ b/datafusion/sql/src/expr/function.rs
@@ -415,9 +415,11 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
     ) -> Result<WindowFunctionDefinition> {
         // check udaf first
         let udaf = self.context_provider.get_aggregate_meta(name);
-        // Skip first value and last value, since we expect window builtin 
first/last value not udaf version
+        // Use the builtin window function instead of the user-defined 
aggregate function
         if udaf.as_ref().is_some_and(|udaf| {
-            udaf.name() != "first_value" && udaf.name() != "last_value"
+            udaf.name() != "first_value"
+                && udaf.name() != "last_value"
+                && udaf.name() != "nth_value"
         }) {
             Ok(WindowFunctionDefinition::AggregateUDF(udaf.unwrap()))
         } else {
diff --git a/datafusion/sqllogictest/test_files/agg_func_substitute.slt 
b/datafusion/sqllogictest/test_files/agg_func_substitute.slt
index 342d45e7fb..9a0a1d5874 100644
--- a/datafusion/sqllogictest/test_files/agg_func_substitute.slt
+++ b/datafusion/sqllogictest/test_files/agg_func_substitute.slt
@@ -39,16 +39,16 @@ EXPLAIN SELECT a, ARRAY_AGG(c ORDER BY c)[1] as result
                         GROUP BY a;
 ----
 logical_plan
-01)Projection: multiple_ordered_table.a, 
NTH_VALUE(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c 
ASC NULLS LAST] AS result
-02)--Aggregate: groupBy=[[multiple_ordered_table.a]], 
aggr=[[NTH_VALUE(multiple_ordered_table.c, Int64(1)) ORDER BY 
[multiple_ordered_table.c ASC NULLS LAST]]]
+01)Projection: multiple_ordered_table.a, 
nth_value(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c 
ASC NULLS LAST] AS result
+02)--Aggregate: groupBy=[[multiple_ordered_table.a]], 
aggr=[[nth_value(multiple_ordered_table.c, Int64(1)) ORDER BY 
[multiple_ordered_table.c ASC NULLS LAST]]]
 03)----TableScan: multiple_ordered_table projection=[a, c]
 physical_plan
-01)ProjectionExec: expr=[a@0 as a, 
NTH_VALUE(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c 
ASC NULLS LAST]@1 as result]
-02)--AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], 
aggr=[NTH_VALUE(multiple_ordered_table.c,Int64(1)) ORDER BY 
[multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted
+01)ProjectionExec: expr=[a@0 as a, 
nth_value(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c 
ASC NULLS LAST]@1 as result]
+02)--AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], 
aggr=[nth_value(multiple_ordered_table.c,Int64(1)) ORDER BY 
[multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted
 03)----SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true]
 04)------CoalesceBatchesExec: target_batch_size=8192
 05)--------RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4
-06)----------AggregateExec: mode=Partial, gby=[a@0 as a], 
aggr=[NTH_VALUE(multiple_ordered_table.c,Int64(1)) ORDER BY 
[multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted
+06)----------AggregateExec: mode=Partial, gby=[a@0 as a], 
aggr=[nth_value(multiple_ordered_table.c,Int64(1)) ORDER BY 
[multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted
 07)------------RepartitionExec: partitioning=RoundRobinBatch(4), 
input_partitions=1
 08)--------------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c], 
output_orderings=[[a@0 ASC NULLS LAST], [c@1 ASC NULLS LAST]], has_header=true
 
@@ -59,16 +59,16 @@ EXPLAIN SELECT a, NTH_VALUE(c, 1 ORDER BY c) as result
                         GROUP BY a;
 ----
 logical_plan
-01)Projection: multiple_ordered_table.a, 
NTH_VALUE(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c 
ASC NULLS LAST] AS result
-02)--Aggregate: groupBy=[[multiple_ordered_table.a]], 
aggr=[[NTH_VALUE(multiple_ordered_table.c, Int64(1)) ORDER BY 
[multiple_ordered_table.c ASC NULLS LAST]]]
+01)Projection: multiple_ordered_table.a, 
nth_value(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c 
ASC NULLS LAST] AS result
+02)--Aggregate: groupBy=[[multiple_ordered_table.a]], 
aggr=[[nth_value(multiple_ordered_table.c, Int64(1)) ORDER BY 
[multiple_ordered_table.c ASC NULLS LAST]]]
 03)----TableScan: multiple_ordered_table projection=[a, c]
 physical_plan
-01)ProjectionExec: expr=[a@0 as a, 
NTH_VALUE(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c 
ASC NULLS LAST]@1 as result]
-02)--AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], 
aggr=[NTH_VALUE(multiple_ordered_table.c,Int64(1)) ORDER BY 
[multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted
+01)ProjectionExec: expr=[a@0 as a, 
nth_value(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c 
ASC NULLS LAST]@1 as result]
+02)--AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], 
aggr=[nth_value(multiple_ordered_table.c,Int64(1)) ORDER BY 
[multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted
 03)----SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true]
 04)------CoalesceBatchesExec: target_batch_size=8192
 05)--------RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4
-06)----------AggregateExec: mode=Partial, gby=[a@0 as a], 
aggr=[NTH_VALUE(multiple_ordered_table.c,Int64(1)) ORDER BY 
[multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted
+06)----------AggregateExec: mode=Partial, gby=[a@0 as a], 
aggr=[nth_value(multiple_ordered_table.c,Int64(1)) ORDER BY 
[multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted
 07)------------RepartitionExec: partitioning=RoundRobinBatch(4), 
input_partitions=1
 08)--------------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c], 
output_orderings=[[a@0 ASC NULLS LAST], [c@1 ASC NULLS LAST]], has_header=true
 
@@ -78,16 +78,16 @@ EXPLAIN SELECT a, ARRAY_AGG(c ORDER BY c)[1 + 100] as result
                         GROUP BY a;
 ----
 logical_plan
-01)Projection: multiple_ordered_table.a, 
NTH_VALUE(multiple_ordered_table.c,Int64(1) + Int64(100)) ORDER BY 
[multiple_ordered_table.c ASC NULLS LAST] AS result
-02)--Aggregate: groupBy=[[multiple_ordered_table.a]], 
aggr=[[NTH_VALUE(multiple_ordered_table.c, Int64(101)) ORDER BY 
[multiple_ordered_table.c ASC NULLS LAST] AS 
NTH_VALUE(multiple_ordered_table.c,Int64(1) + Int64(100)) ORDER BY 
[multiple_ordered_table.c ASC NULLS LAST]]]
+01)Projection: multiple_ordered_table.a, 
nth_value(multiple_ordered_table.c,Int64(1) + Int64(100)) ORDER BY 
[multiple_ordered_table.c ASC NULLS LAST] AS result
+02)--Aggregate: groupBy=[[multiple_ordered_table.a]], 
aggr=[[nth_value(multiple_ordered_table.c, Int64(101)) ORDER BY 
[multiple_ordered_table.c ASC NULLS LAST] AS 
nth_value(multiple_ordered_table.c,Int64(1) + Int64(100)) ORDER BY 
[multiple_ordered_table.c ASC NULLS LAST]]]
 03)----TableScan: multiple_ordered_table projection=[a, c]
 physical_plan
-01)ProjectionExec: expr=[a@0 as a, NTH_VALUE(multiple_ordered_table.c,Int64(1) 
+ Int64(100)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]@1 as result]
-02)--AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], 
aggr=[NTH_VALUE(multiple_ordered_table.c,Int64(1) + Int64(100)) ORDER BY 
[multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted
+01)ProjectionExec: expr=[a@0 as a, nth_value(multiple_ordered_table.c,Int64(1) 
+ Int64(100)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]@1 as result]
+02)--AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], 
aggr=[nth_value(multiple_ordered_table.c,Int64(1) + Int64(100)) ORDER BY 
[multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted
 03)----SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true]
 04)------CoalesceBatchesExec: target_batch_size=8192
 05)--------RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4
-06)----------AggregateExec: mode=Partial, gby=[a@0 as a], 
aggr=[NTH_VALUE(multiple_ordered_table.c,Int64(1) + Int64(100)) ORDER BY 
[multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted
+06)----------AggregateExec: mode=Partial, gby=[a@0 as a], 
aggr=[nth_value(multiple_ordered_table.c,Int64(1) + Int64(100)) ORDER BY 
[multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted
 07)------------RepartitionExec: partitioning=RoundRobinBatch(4), 
input_partitions=1
 08)--------------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c], 
output_orderings=[[a@0 ASC NULLS LAST], [c@1 ASC NULLS LAST]], has_header=true
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to