This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/asf-site by this push:
     new 4b58bf8e5e Publish built docs triggered by 
cd1c648e719fdfacbd7da586fed5251f5f26abde
4b58bf8e5e is described below

commit 4b58bf8e5e67a85dbb3892f3b3b85a607e6041b6
Author: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
AuthorDate: Wed Nov 15 18:33:49 2023 +0000

    Publish built docs triggered by cd1c648e719fdfacbd7da586fed5251f5f26abde
---
 _sources/library-user-guide/adding-udfs.md.txt | 316 +++++++++++++++++++++++-
 library-user-guide/adding-udfs.html            | 317 ++++++++++++++++++++++++-
 searchindex.js                                 |   2 +-
 3 files changed, 626 insertions(+), 9 deletions(-)

diff --git a/_sources/library-user-guide/adding-udfs.md.txt 
b/_sources/library-user-guide/adding-udfs.md.txt
index a4b5ed0b40..1e710bc321 100644
--- a/_sources/library-user-guide/adding-udfs.md.txt
+++ b/_sources/library-user-guide/adding-udfs.md.txt
@@ -38,7 +38,7 @@ A Scalar UDF is a function that takes a row of data and 
returns a single value.
 ```rust
 use std::sync::Arc;
 
-use arrow::array::{ArrayRef, Int64Array};
+use datafusion::arrow::array::{ArrayRef, Int64Array};
 use datafusion::common::Result;
 
 use datafusion::common::cast::as_int64_array;
@@ -78,6 +78,11 @@ The challenge however is that DataFusion doesn't know about 
this function. We ne
 To register a Scalar UDF, you need to wrap the function implementation in a 
`ScalarUDF` struct and then register it with the `SessionContext`. DataFusion 
provides the `create_udf` and `make_scalar_function` helper functions to make 
this easier.
 
 ```rust
+use datafusion::logical_expr::{Volatility, create_udf};
+use datafusion::physical_plan::functions::make_scalar_function;
+use datafusion::arrow::datatypes::DataType;
+use std::sync::Arc;
+
 let udf = create_udf(
     "add_one",
     vec![DataType::Int64],
@@ -98,6 +103,8 @@ A few things to note:
 That gives us a `ScalarUDF` that we can register with the `SessionContext`:
 
 ```rust
+use datafusion::execution::context::SessionContext;
+
 let mut ctx = SessionContext::new();
 
 ctx.register_udf(udf);
@@ -115,10 +122,313 @@ let df = ctx.sql(&sql).await.unwrap();
 
 Scalar UDFs are functions that take a row of data and return a single value. 
Window UDFs are similar, but they also have access to the rows around them. 
Access to the the proximal rows is helpful, but adds some complexity to the 
implementation.
 
-Body coming soon.
+For example, we will declare a user defined window function that computes a 
moving average.
+
+```rust
+use datafusion::arrow::{array::{ArrayRef, Float64Array, AsArray}, 
datatypes::Float64Type};
+use datafusion::logical_expr::{PartitionEvaluator};
+use datafusion::common::ScalarValue;
+use datafusion::error::Result;
+/// This implements the lowest level evaluation for a window function
+///
+/// It handles calculating the value of the window function for each
+/// distinct values of `PARTITION BY`
+#[derive(Clone, Debug)]
+struct MyPartitionEvaluator {}
+
+impl MyPartitionEvaluator {
+    fn new() -> Self {
+        Self {}
+    }
+}
+
+/// Different evaluation methods are called depending on the various
+/// settings of WindowUDF. This example uses the simplest and most
+/// general, `evaluate`. See `PartitionEvaluator` for the other more
+/// advanced uses.
+impl PartitionEvaluator for MyPartitionEvaluator {
+    /// Tell DataFusion the window function varies based on the value
+    /// of the window frame.
+    fn uses_window_frame(&self) -> bool {
+        true
+    }
+
+    /// This function is called once per input row.
+    ///
+    /// `range`specifies which indexes of `values` should be
+    /// considered for the calculation.
+    ///
+    /// Note this is the SLOWEST, but simplest, way to evaluate a
+    /// window function. It is much faster to implement
+    /// evaluate_all or evaluate_all_with_rank, if possible
+    fn evaluate(
+        &mut self,
+        values: &[ArrayRef],
+        range: &std::ops::Range<usize>,
+    ) -> Result<ScalarValue> {
+        // Again, the input argument is an array of floating
+        // point numbers to calculate a moving average
+        let arr: &Float64Array = 
values[0].as_ref().as_primitive::<Float64Type>();
+
+        let range_len = range.end - range.start;
+
+        // our smoothing function will average all the values in the
+        let output = if range_len > 0 {
+            let sum: f64 = 
arr.values().iter().skip(range.start).take(range_len).sum();
+            Some(sum / range_len as f64)
+        } else {
+            None
+        };
+
+        Ok(ScalarValue::Float64(output))
+    }
+}
+
+/// Create a `PartitionEvalutor` to evaluate this function on a new
+/// partition.
+fn make_partition_evaluator() -> Result<Box<dyn PartitionEvaluator>> {
+    Ok(Box::new(MyPartitionEvaluator::new()))
+}
+```
+
+### Registering a Window UDF
+
+To register a Window UDF, you need to wrap the function implementation in a 
`WindowUDF` struct and then register it with the `SessionContext`. DataFusion 
provides the `create_udwf` helper functions to make this easier.
+
+```rust
+use datafusion::logical_expr::{Volatility, create_udwf};
+use datafusion::arrow::datatypes::DataType;
+use std::sync::Arc;
+
+// here is where we define the UDWF. We also declare its signature:
+let smooth_it = create_udwf(
+    "smooth_it",
+    DataType::Float64,
+    Arc::new(DataType::Float64),
+    Volatility::Immutable,
+    Arc::new(make_partition_evaluator),
+);
+```
+
+The `create_udwf` has five arguments to check:
+
+- The first argument is the name of the function. This is the name that will 
be used in SQL queries.
+- **The second argument** is the `DataType` of input array (attention: this is 
not a list of arrays). I.e. in this case, the function accepts `Float64` as 
argument.
+- The third argument is the return type of the function. I.e. in this case, 
the function returns an `Float64`.
+- The fourth argument is the volatility of the function. In short, this is 
used to determine if the function's performance can be optimized in some 
situations. In this case, the function is `Immutable` because it always returns 
the same value for the same input. A random number generator would be 
`Volatile` because it returns a different value for the same input.
+- **The fifth argument** is the function implementation. This is the function 
that we defined above.
+
+That gives us a `WindowUDF` that we can register with the `SessionContext`:
+
+```rust
+use datafusion::execution::context::SessionContext;
+
+let ctx = SessionContext::new();
+
+ctx.register_udwf(smooth_it);
+```
+
+At this point, you can use the `smooth_it` function in your query:
+
+For example, if we have a 
[`cars.csv`](https://github.com/apache/arrow-datafusion/blob/main/datafusion/core/tests/data/cars.csv)
 whose contents like
+
+```csv
+car,speed,time
+red,20.0,1996-04-12T12:05:03.000000000
+red,20.3,1996-04-12T12:05:04.000000000
+green,10.0,1996-04-12T12:05:03.000000000
+green,10.3,1996-04-12T12:05:04.000000000
+...
+```
+
+Then, we can query like below:
+
+```rust
+use datafusion::datasource::file_format::options::CsvReadOptions;
+// register csv table first
+let csv_path = "cars.csv".to_string();
+ctx.register_csv("cars", &csv_path, 
CsvReadOptions::default().has_header(true)).await?;
+// do query with smooth_it
+let df = ctx
+    .sql(
+        "SELECT \
+           car, \
+           speed, \
+           smooth_it(speed) OVER (PARTITION BY car ORDER BY time) as 
smooth_speed,\
+           time \
+           from cars \
+         ORDER BY \
+           car",
+    )
+    .await?;
+// print the results
+df.show().await?;
+```
+
+the output will be like:
+
+```csv
++-------+-------+--------------------+---------------------+
+| car   | speed | smooth_speed       | time                |
++-------+-------+--------------------+---------------------+
+| green | 10.0  | 10.0               | 1996-04-12T12:05:03 |
+| green | 10.3  | 10.15              | 1996-04-12T12:05:04 |
+| green | 10.4  | 10.233333333333334 | 1996-04-12T12:05:05 |
+| green | 10.5  | 10.3               | 1996-04-12T12:05:06 |
+| green | 11.0  | 10.440000000000001 | 1996-04-12T12:05:07 |
+| green | 12.0  | 10.700000000000001 | 1996-04-12T12:05:08 |
+| green | 14.0  | 11.171428571428573 | 1996-04-12T12:05:09 |
+| green | 15.0  | 11.65              | 1996-04-12T12:05:10 |
+| green | 15.1  | 12.033333333333333 | 1996-04-12T12:05:11 |
+| green | 15.2  | 12.35              | 1996-04-12T12:05:12 |
+| green | 8.0   | 11.954545454545455 | 1996-04-12T12:05:13 |
+| green | 2.0   | 11.125             | 1996-04-12T12:05:14 |
+| red   | 20.0  | 20.0               | 1996-04-12T12:05:03 |
+| red   | 20.3  | 20.15              | 1996-04-12T12:05:04 |
+...
+```
 
 ## Adding an Aggregate UDF
 
 Aggregate UDFs are functions that take a group of rows and return a single 
value. These are akin to SQL's `SUM` or `COUNT` functions.
 
-Body coming soon.
+For example, we will declare a single-type, single return type UDAF that 
computes the geometric mean.
+
+```rust
+use datafusion::arrow::array::ArrayRef;
+use datafusion::scalar::ScalarValue;
+use datafusion::{error::Result, physical_plan::Accumulator};
+
+/// A UDAF has state across multiple rows, and thus we require a `struct` with 
that state.
+#[derive(Debug)]
+struct GeometricMean {
+    n: u32,
+    prod: f64,
+}
+
+impl GeometricMean {
+    // how the struct is initialized
+    pub fn new() -> Self {
+        GeometricMean { n: 0, prod: 1.0 }
+    }
+}
+
+// UDAFs are built using the trait `Accumulator`, that offers DataFusion the 
necessary functions
+// to use them.
+impl Accumulator for GeometricMean {
+    // This function serializes our state to `ScalarValue`, which DataFusion 
uses
+    // to pass this state between execution stages.
+    // Note that this can be arbitrary data.
+    fn state(&self) -> Result<Vec<ScalarValue>> {
+        Ok(vec![
+            ScalarValue::from(self.prod),
+            ScalarValue::from(self.n),
+        ])
+    }
+
+    // DataFusion expects this function to return the final value of this 
aggregator.
+    // in this case, this is the formula of the geometric mean
+    fn evaluate(&self) -> Result<ScalarValue> {
+        let value = self.prod.powf(1.0 / self.n as f64);
+        Ok(ScalarValue::from(value))
+    }
+
+    // DataFusion calls this function to update the accumulator's state for a 
batch
+    // of inputs rows. In this case the product is updated with values from 
the first column
+    // and the count is updated based on the row count
+    fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
+        if values.is_empty() {
+            return Ok(());
+        }
+        let arr = &values[0];
+        (0..arr.len()).try_for_each(|index| {
+            let v = ScalarValue::try_from_array(arr, index)?;
+
+            if let ScalarValue::Float64(Some(value)) = v {
+                self.prod *= value;
+                self.n += 1;
+            } else {
+                unreachable!("")
+            }
+            Ok(())
+        })
+    }
+
+    // Optimization hint: this trait also supports `update_batch` and 
`merge_batch`,
+    // that can be used to perform these operations on arrays instead of 
single values.
+    fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
+        if states.is_empty() {
+            return Ok(());
+        }
+        let arr = &states[0];
+        (0..arr.len()).try_for_each(|index| {
+            let v = states
+                .iter()
+                .map(|array| ScalarValue::try_from_array(array, index))
+                .collect::<Result<Vec<_>>>()?;
+            if let (ScalarValue::Float64(Some(prod)), 
ScalarValue::UInt32(Some(n))) = (&v[0], &v[1])
+            {
+                self.prod *= prod;
+                self.n += n;
+            } else {
+                unreachable!("")
+            }
+            Ok(())
+        })
+    }
+
+    fn size(&self) -> usize {
+        std::mem::size_of_val(self)
+    }
+}
+```
+
+### registering an Aggregate UDF
+
+To register a Aggreate UDF, you need to wrap the function implementation in a 
`AggregateUDF` struct and then register it with the `SessionContext`. 
DataFusion provides the `create_udaf` helper functions to make this easier.
+
+```rust
+use datafusion::logical_expr::{Volatility, create_udaf};
+use datafusion::arrow::datatypes::DataType;
+use std::sync::Arc;
+
+// here is where we define the UDAF. We also declare its signature:
+let geometric_mean = create_udaf(
+    // the name; used to represent it in plan descriptions and in the 
registry, to use in SQL.
+    "geo_mean",
+    // the input type; DataFusion guarantees that the first entry of `values` 
in `update` has this type.
+    vec![DataType::Float64],
+    // the return type; DataFusion expects this to match the type returned by 
`evaluate`.
+    Arc::new(DataType::Float64),
+    Volatility::Immutable,
+    // This is the accumulator factory; DataFusion uses it to create new 
accumulators.
+    Arc::new(|_| Ok(Box::new(GeometricMean::new()))),
+    // This is the description of the state. `state()` must match the types 
here.
+    Arc::new(vec![DataType::Float64, DataType::UInt32]),
+);
+```
+
+The `create_udaf` has six arguments to check:
+
+- The first argument is the name of the function. This is the name that will 
be used in SQL queries.
+- The second argument is a vector of `DataType`s. This is the list of argument 
types that the function accepts. I.e. in this case, the function accepts a 
single `Float64` argument.
+- The third argument is the return type of the function. I.e. in this case, 
the function returns an `Int64`.
+- The fourth argument is the volatility of the function. In short, this is 
used to determine if the function's performance can be optimized in some 
situations. In this case, the function is `Immutable` because it always returns 
the same value for the same input. A random number generator would be 
`Volatile` because it returns a different value for the same input.
+- The fifth argument is the function implementation. This is the function that 
we defined above.
+- The sixth argument is the description of the state, which will by passed 
between execution stages.
+
+That gives us a `AggregateUDF` that we can register with the `SessionContext`:
+
+```rust
+use datafusion::execution::context::SessionContext;
+
+let ctx = SessionContext::new();
+
+ctx.register_udaf(geometric_mean);
+```
+
+Then, we can query like below:
+
+```rust
+let df = ctx.sql("SELECT geo_mean(a) FROM t").await?;
+```
diff --git a/library-user-guide/adding-udfs.html 
b/library-user-guide/adding-udfs.html
index e9132e93ab..39d2c3d3ec 100644
--- a/library-user-guide/adding-udfs.html
+++ b/library-user-guide/adding-udfs.html
@@ -362,11 +362,25 @@
   <a class="reference internal nav-link" href="#adding-a-window-udf">
    Adding a Window UDF
   </a>
+  <ul class="nav section-nav flex-column">
+   <li class="toc-h3 nav-item toc-entry">
+    <a class="reference internal nav-link" href="#registering-a-window-udf">
+     Registering a Window UDF
+    </a>
+   </li>
+  </ul>
  </li>
  <li class="toc-h2 nav-item toc-entry">
   <a class="reference internal nav-link" href="#adding-an-aggregate-udf">
    Adding an Aggregate UDF
   </a>
+  <ul class="nav section-nav flex-column">
+   <li class="toc-h3 nav-item toc-entry">
+    <a class="reference internal nav-link" 
href="#registering-an-aggregate-udf">
+     registering an Aggregate UDF
+    </a>
+   </li>
+  </ul>
  </li>
 </ul>
 
@@ -446,7 +460,7 @@
 <p>A Scalar UDF is a function that takes a row of data and returns a single 
value. For example, this function takes a single i64 and returns a single i64 
with 1 added to it:</p>
 <div class="highlight-rust notranslate"><div 
class="highlight"><pre><span></span><span class="k">use</span><span class="w"> 
</span><span class="n">std</span>::<span class="n">sync</span>::<span 
class="n">Arc</span><span class="p">;</span>
 
-<span class="k">use</span><span class="w"> </span><span 
class="n">arrow</span>::<span class="n">array</span>::<span 
class="p">{</span><span class="n">ArrayRef</span><span class="p">,</span><span 
class="w"> </span><span class="n">Int64Array</span><span class="p">};</span>
+<span class="k">use</span><span class="w"> </span><span 
class="n">datafusion</span>::<span class="n">arrow</span>::<span 
class="n">array</span>::<span class="p">{</span><span 
class="n">ArrayRef</span><span class="p">,</span><span class="w"> </span><span 
class="n">Int64Array</span><span class="p">};</span>
 <span class="k">use</span><span class="w"> </span><span 
class="n">datafusion</span>::<span class="n">common</span>::<span 
class="nb">Result</span><span class="p">;</span>
 
 <span class="k">use</span><span class="w"> </span><span 
class="n">datafusion</span>::<span class="n">common</span>::<span 
class="n">cast</span>::<span class="n">as_int64_array</span><span 
class="p">;</span>
@@ -480,7 +494,12 @@
 <section id="registering-a-scalar-udf">
 <h3>Registering a Scalar UDF<a class="headerlink" 
href="#registering-a-scalar-udf" title="Link to this heading">¶</a></h3>
 <p>To register a Scalar UDF, you need to wrap the function implementation in a 
<code class="docutils literal notranslate"><span 
class="pre">ScalarUDF</span></code> struct and then register it with the <code 
class="docutils literal notranslate"><span 
class="pre">SessionContext</span></code>. DataFusion provides the <code 
class="docutils literal notranslate"><span class="pre">create_udf</span></code> 
and <code class="docutils literal notranslate"><span 
class="pre">make_scalar_function</spa [...]
-<div class="highlight-rust notranslate"><div 
class="highlight"><pre><span></span><span class="kd">let</span><span class="w"> 
</span><span class="n">udf</span><span class="w"> </span><span 
class="o">=</span><span class="w"> </span><span 
class="n">create_udf</span><span class="p">(</span>
+<div class="highlight-rust notranslate"><div 
class="highlight"><pre><span></span><span class="k">use</span><span class="w"> 
</span><span class="n">datafusion</span>::<span 
class="n">logical_expr</span>::<span class="p">{</span><span 
class="n">Volatility</span><span class="p">,</span><span class="w"> 
</span><span class="n">create_udf</span><span class="p">};</span>
+<span class="k">use</span><span class="w"> </span><span 
class="n">datafusion</span>::<span class="n">physical_plan</span>::<span 
class="n">functions</span>::<span class="n">make_scalar_function</span><span 
class="p">;</span>
+<span class="k">use</span><span class="w"> </span><span 
class="n">datafusion</span>::<span class="n">arrow</span>::<span 
class="n">datatypes</span>::<span class="n">DataType</span><span 
class="p">;</span>
+<span class="k">use</span><span class="w"> </span><span 
class="n">std</span>::<span class="n">sync</span>::<span 
class="n">Arc</span><span class="p">;</span>
+
+<span class="kd">let</span><span class="w"> </span><span 
class="n">udf</span><span class="w"> </span><span class="o">=</span><span 
class="w"> </span><span class="n">create_udf</span><span class="p">(</span>
 <span class="w">    </span><span class="s">&quot;add_one&quot;</span><span 
class="p">,</span>
 <span class="w">    </span><span class="fm">vec!</span><span 
class="p">[</span><span class="n">DataType</span>::<span 
class="n">Int64</span><span class="p">],</span>
 <span class="w">    </span><span class="n">Arc</span>::<span 
class="n">new</span><span class="p">(</span><span 
class="n">DataType</span>::<span class="n">Int64</span><span class="p">),</span>
@@ -498,7 +517,9 @@
 <li><p>The fifth argument is the function implementation. This is the function 
that we defined above.</p></li>
 </ul>
 <p>That gives us a <code class="docutils literal notranslate"><span 
class="pre">ScalarUDF</span></code> that we can register with the <code 
class="docutils literal notranslate"><span 
class="pre">SessionContext</span></code>:</p>
-<div class="highlight-rust notranslate"><div 
class="highlight"><pre><span></span><span class="kd">let</span><span class="w"> 
</span><span class="k">mut</span><span class="w"> </span><span 
class="n">ctx</span><span class="w"> </span><span class="o">=</span><span 
class="w"> </span><span class="n">SessionContext</span>::<span 
class="n">new</span><span class="p">();</span>
+<div class="highlight-rust notranslate"><div 
class="highlight"><pre><span></span><span class="k">use</span><span class="w"> 
</span><span class="n">datafusion</span>::<span 
class="n">execution</span>::<span class="n">context</span>::<span 
class="n">SessionContext</span><span class="p">;</span>
+
+<span class="kd">let</span><span class="w"> </span><span 
class="k">mut</span><span class="w"> </span><span class="n">ctx</span><span 
class="w"> </span><span class="o">=</span><span class="w"> </span><span 
class="n">SessionContext</span>::<span class="n">new</span><span 
class="p">();</span>
 
 <span class="n">ctx</span><span class="p">.</span><span 
class="n">register_udf</span><span class="p">(</span><span 
class="n">udf</span><span class="p">);</span>
 </pre></div>
@@ -514,12 +535,298 @@
 <section id="adding-a-window-udf">
 <h2>Adding a Window UDF<a class="headerlink" href="#adding-a-window-udf" 
title="Link to this heading">¶</a></h2>
 <p>Scalar UDFs are functions that take a row of data and return a single 
value. Window UDFs are similar, but they also have access to the rows around 
them. Access to the the proximal rows is helpful, but adds some complexity to 
the implementation.</p>
-<p>Body coming soon.</p>
+<p>For example, we will declare a user defined window function that computes a 
moving average.</p>
+<div class="highlight-rust notranslate"><div 
class="highlight"><pre><span></span><span class="k">use</span><span class="w"> 
</span><span class="n">datafusion</span>::<span class="n">arrow</span>::<span 
class="p">{</span><span class="n">array</span>::<span class="p">{</span><span 
class="n">ArrayRef</span><span class="p">,</span><span class="w"> </span><span 
class="n">Float64Array</span><span class="p">,</span><span class="w"> 
</span><span class="n">AsArray</span><span class="p">},</span>< [...]
+<span class="k">use</span><span class="w"> </span><span 
class="n">datafusion</span>::<span class="n">logical_expr</span>::<span 
class="p">{</span><span class="n">PartitionEvaluator</span><span 
class="p">};</span>
+<span class="k">use</span><span class="w"> </span><span 
class="n">datafusion</span>::<span class="n">common</span>::<span 
class="n">ScalarValue</span><span class="p">;</span>
+<span class="k">use</span><span class="w"> </span><span 
class="n">datafusion</span>::<span class="n">error</span>::<span 
class="nb">Result</span><span class="p">;</span>
+<span class="sd">/// This implements the lowest level evaluation for a window 
function</span>
+<span class="sd">///</span>
+<span class="sd">/// It handles calculating the value of the window function 
for each</span>
+<span class="sd">/// distinct values of `PARTITION BY`</span>
+<span class="cp">#[derive(Clone, Debug)]</span>
+<span class="k">struct</span> <span 
class="nc">MyPartitionEvaluator</span><span class="w"> </span><span 
class="p">{}</span>
+
+<span class="k">impl</span><span class="w"> </span><span 
class="n">MyPartitionEvaluator</span><span class="w"> </span><span 
class="p">{</span>
+<span class="w">    </span><span class="k">fn</span> <span 
class="nf">new</span><span class="p">()</span><span class="w"> </span>-&gt; 
<span class="nc">Self</span><span class="w"> </span><span class="p">{</span>
+<span class="w">        </span><span class="bp">Self</span><span class="w"> 
</span><span class="p">{}</span>
+<span class="w">    </span><span class="p">}</span>
+<span class="p">}</span>
+
+<span class="sd">/// Different evaluation methods are called depending on the 
various</span>
+<span class="sd">/// settings of WindowUDF. This example uses the simplest and 
most</span>
+<span class="sd">/// general, `evaluate`. See `PartitionEvaluator` for the 
other more</span>
+<span class="sd">/// advanced uses.</span>
+<span class="k">impl</span><span class="w"> </span><span 
class="n">PartitionEvaluator</span><span class="w"> </span><span 
class="k">for</span><span class="w"> </span><span 
class="n">MyPartitionEvaluator</span><span class="w"> </span><span 
class="p">{</span>
+<span class="w">    </span><span class="sd">/// Tell DataFusion the window 
function varies based on the value</span>
+<span class="w">    </span><span class="sd">/// of the window frame.</span>
+<span class="w">    </span><span class="k">fn</span> <span 
class="nf">uses_window_frame</span><span class="p">(</span><span 
class="o">&amp;</span><span class="bp">self</span><span class="p">)</span><span 
class="w"> </span>-&gt; <span class="kt">bool</span> <span class="p">{</span>
+<span class="w">        </span><span class="kc">true</span>
+<span class="w">    </span><span class="p">}</span>
+
+<span class="w">    </span><span class="sd">/// This function is called once 
per input row.</span>
+<span class="w">    </span><span class="sd">///</span>
+<span class="w">    </span><span class="sd">/// `range`specifies which indexes 
of `values` should be</span>
+<span class="w">    </span><span class="sd">/// considered for the 
calculation.</span>
+<span class="w">    </span><span class="sd">///</span>
+<span class="w">    </span><span class="sd">/// Note this is the SLOWEST, but 
simplest, way to evaluate a</span>
+<span class="w">    </span><span class="sd">/// window function. It is much 
faster to implement</span>
+<span class="w">    </span><span class="sd">/// evaluate_all or 
evaluate_all_with_rank, if possible</span>
+<span class="w">    </span><span class="k">fn</span> <span 
class="nf">evaluate</span><span class="p">(</span>
+<span class="w">        </span><span class="o">&amp;</span><span 
class="k">mut</span><span class="w"> </span><span class="bp">self</span><span 
class="p">,</span>
+<span class="w">        </span><span class="n">values</span>: <span 
class="kp">&amp;</span><span class="p">[</span><span 
class="n">ArrayRef</span><span class="p">],</span>
+<span class="w">        </span><span class="n">range</span>: <span 
class="kp">&amp;</span><span class="nc">std</span>::<span 
class="n">ops</span>::<span class="n">Range</span><span 
class="o">&lt;</span><span class="kt">usize</span><span 
class="o">&gt;</span><span class="p">,</span>
+<span class="w">    </span><span class="p">)</span><span class="w"> 
</span>-&gt; <span class="nb">Result</span><span class="o">&lt;</span><span 
class="n">ScalarValue</span><span class="o">&gt;</span><span class="w"> 
</span><span class="p">{</span>
+<span class="w">        </span><span class="c1">// Again, the input argument 
is an array of floating</span>
+<span class="w">        </span><span class="c1">// point numbers to calculate 
a moving average</span>
+<span class="w">        </span><span class="kd">let</span><span class="w"> 
</span><span class="n">arr</span>: <span class="kp">&amp;</span><span 
class="nc">Float64Array</span><span class="w"> </span><span 
class="o">=</span><span class="w"> </span><span class="n">values</span><span 
class="p">[</span><span class="mi">0</span><span class="p">].</span><span 
class="n">as_ref</span><span class="p">().</span><span 
class="n">as_primitive</span>::<span class="o">&lt;</span><span 
class="n">Float64 [...]
+
+<span class="w">        </span><span class="kd">let</span><span class="w"> 
</span><span class="n">range_len</span><span class="w"> </span><span 
class="o">=</span><span class="w"> </span><span class="n">range</span><span 
class="p">.</span><span class="n">end</span><span class="w"> </span><span 
class="o">-</span><span class="w"> </span><span class="n">range</span><span 
class="p">.</span><span class="n">start</span><span class="p">;</span>
+
+<span class="w">        </span><span class="c1">// our smoothing function will 
average all the values in the</span>
+<span class="w">        </span><span class="kd">let</span><span class="w"> 
</span><span class="n">output</span><span class="w"> </span><span 
class="o">=</span><span class="w"> </span><span class="k">if</span><span 
class="w"> </span><span class="n">range_len</span><span class="w"> </span><span 
class="o">&gt;</span><span class="w"> </span><span class="mi">0</span><span 
class="w"> </span><span class="p">{</span>
+<span class="w">            </span><span class="kd">let</span><span class="w"> 
</span><span class="n">sum</span>: <span class="kt">f64</span> <span 
class="o">=</span><span class="w"> </span><span class="n">arr</span><span 
class="p">.</span><span class="n">values</span><span class="p">().</span><span 
class="n">iter</span><span class="p">().</span><span class="n">skip</span><span 
class="p">(</span><span class="n">range</span><span class="p">.</span><span 
class="n">start</span><span class=" [...]
+<span class="w">            </span><span class="nb">Some</span><span 
class="p">(</span><span class="n">sum</span><span class="w"> </span><span 
class="o">/</span><span class="w"> </span><span class="n">range_len</span><span 
class="w"> </span><span class="k">as</span><span class="w"> </span><span 
class="kt">f64</span><span class="p">)</span>
+<span class="w">        </span><span class="p">}</span><span class="w"> 
</span><span class="k">else</span><span class="w"> </span><span 
class="p">{</span>
+<span class="w">            </span><span class="nb">None</span>
+<span class="w">        </span><span class="p">};</span>
+
+<span class="w">        </span><span class="nb">Ok</span><span 
class="p">(</span><span class="n">ScalarValue</span>::<span 
class="n">Float64</span><span class="p">(</span><span 
class="n">output</span><span class="p">))</span>
+<span class="w">    </span><span class="p">}</span>
+<span class="p">}</span>
+
+<span class="sd">/// Create a `PartitionEvalutor` to evaluate this function on 
a new</span>
+<span class="sd">/// partition.</span>
+<span class="k">fn</span> <span 
class="nf">make_partition_evaluator</span><span class="p">()</span><span 
class="w"> </span>-&gt; <span class="nb">Result</span><span 
class="o">&lt;</span><span class="nb">Box</span><span 
class="o">&lt;</span><span class="k">dyn</span><span class="w"> </span><span 
class="n">PartitionEvaluator</span><span class="o">&gt;&gt;</span><span 
class="w"> </span><span class="p">{</span>
+<span class="w">    </span><span class="nb">Ok</span><span 
class="p">(</span><span class="nb">Box</span>::<span class="n">new</span><span 
class="p">(</span><span class="n">MyPartitionEvaluator</span>::<span 
class="n">new</span><span class="p">()))</span>
+<span class="p">}</span>
+</pre></div>
+</div>
+<section id="registering-a-window-udf">
+<h3>Registering a Window UDF<a class="headerlink" 
href="#registering-a-window-udf" title="Link to this heading">¶</a></h3>
+<p>To register a Window UDF, you need to wrap the function implementation in a 
<code class="docutils literal notranslate"><span 
class="pre">WindowUDF</span></code> struct and then register it with the <code 
class="docutils literal notranslate"><span 
class="pre">SessionContext</span></code>. DataFusion provides the <code 
class="docutils literal notranslate"><span 
class="pre">create_udwf</span></code> helper functions to make this easier.</p>
+<div class="highlight-rust notranslate"><div 
class="highlight"><pre><span></span><span class="k">use</span><span class="w"> 
</span><span class="n">datafusion</span>::<span 
class="n">logical_expr</span>::<span class="p">{</span><span 
class="n">Volatility</span><span class="p">,</span><span class="w"> 
</span><span class="n">create_udwf</span><span class="p">};</span>
+<span class="k">use</span><span class="w"> </span><span 
class="n">datafusion</span>::<span class="n">arrow</span>::<span 
class="n">datatypes</span>::<span class="n">DataType</span><span 
class="p">;</span>
+<span class="k">use</span><span class="w"> </span><span 
class="n">std</span>::<span class="n">sync</span>::<span 
class="n">Arc</span><span class="p">;</span>
+
+<span class="c1">// here is where we define the UDWF. We also declare its 
signature:</span>
+<span class="kd">let</span><span class="w"> </span><span 
class="n">smooth_it</span><span class="w"> </span><span class="o">=</span><span 
class="w"> </span><span class="n">create_udwf</span><span class="p">(</span>
+<span class="w">    </span><span class="s">&quot;smooth_it&quot;</span><span 
class="p">,</span>
+<span class="w">    </span><span class="n">DataType</span>::<span 
class="n">Float64</span><span class="p">,</span>
+<span class="w">    </span><span class="n">Arc</span>::<span 
class="n">new</span><span class="p">(</span><span 
class="n">DataType</span>::<span class="n">Float64</span><span 
class="p">),</span>
+<span class="w">    </span><span class="n">Volatility</span>::<span 
class="n">Immutable</span><span class="p">,</span>
+<span class="w">    </span><span class="n">Arc</span>::<span 
class="n">new</span><span class="p">(</span><span 
class="n">make_partition_evaluator</span><span class="p">),</span>
+<span class="p">);</span>
+</pre></div>
+</div>
+<p>The <code class="docutils literal notranslate"><span 
class="pre">create_udwf</span></code> has five arguments to check:</p>
+<ul class="simple">
+<li><p>The first argument is the name of the function. This is the name that 
will be used in SQL queries.</p></li>
+<li><p><strong>The second argument</strong> is the <code class="docutils 
literal notranslate"><span class="pre">DataType</span></code> of input array 
(attention: this is not a list of arrays). I.e. in this case, the function 
accepts <code class="docutils literal notranslate"><span 
class="pre">Float64</span></code> as argument.</p></li>
+<li><p>The third argument is the return type of the function. I.e. in this 
case, the function returns an <code class="docutils literal notranslate"><span 
class="pre">Float64</span></code>.</p></li>
+<li><p>The fourth argument is the volatility of the function. In short, this 
is used to determine if the function’s performance can be optimized in some 
situations. In this case, the function is <code class="docutils literal 
notranslate"><span class="pre">Immutable</span></code> because it always 
returns the same value for the same input. A random number generator would be 
<code class="docutils literal notranslate"><span 
class="pre">Volatile</span></code> because it returns a different v [...]
+<li><p><strong>The fifth argument</strong> is the function implementation. 
This is the function that we defined above.</p></li>
+</ul>
+<p>That gives us a <code class="docutils literal notranslate"><span 
class="pre">WindowUDF</span></code> that we can register with the <code 
class="docutils literal notranslate"><span 
class="pre">SessionContext</span></code>:</p>
+<div class="highlight-rust notranslate"><div 
class="highlight"><pre><span></span><span class="k">use</span><span class="w"> 
</span><span class="n">datafusion</span>::<span 
class="n">execution</span>::<span class="n">context</span>::<span 
class="n">SessionContext</span><span class="p">;</span>
+
+<span class="kd">let</span><span class="w"> </span><span 
class="n">ctx</span><span class="w"> </span><span class="o">=</span><span 
class="w"> </span><span class="n">SessionContext</span>::<span 
class="n">new</span><span class="p">();</span>
+
+<span class="n">ctx</span><span class="p">.</span><span 
class="n">register_udwf</span><span class="p">(</span><span 
class="n">smooth_it</span><span class="p">);</span>
+</pre></div>
+</div>
+<p>At this point, you can use the <code class="docutils literal 
notranslate"><span class="pre">smooth_it</span></code> function in your 
query:</p>
+<p>For example, if we have a <a class="reference external" 
href="https://github.com/apache/arrow-datafusion/blob/main/datafusion/core/tests/data/cars.csv";><code
 class="docutils literal notranslate"><span 
class="pre">cars.csv</span></code></a> whose contents like</p>
+<div class="highlight-csv notranslate"><div 
class="highlight"><pre><span></span>car,speed,time
+red,20.0,1996-04-12T12:05:03.000000000
+red,20.3,1996-04-12T12:05:04.000000000
+green,10.0,1996-04-12T12:05:03.000000000
+green,10.3,1996-04-12T12:05:04.000000000
+...
+</pre></div>
+</div>
+<p>Then, we can query like below:</p>
+<div class="highlight-rust notranslate"><div 
class="highlight"><pre><span></span><span class="k">use</span><span class="w"> 
</span><span class="n">datafusion</span>::<span 
class="n">datasource</span>::<span class="n">file_format</span>::<span 
class="n">options</span>::<span class="n">CsvReadOptions</span><span 
class="p">;</span>
+<span class="c1">// register csv table first</span>
+<span class="kd">let</span><span class="w"> </span><span 
class="n">csv_path</span><span class="w"> </span><span class="o">=</span><span 
class="w"> </span><span class="s">&quot;cars.csv&quot;</span><span 
class="p">.</span><span class="n">to_string</span><span class="p">();</span>
+<span class="n">ctx</span><span class="p">.</span><span 
class="n">register_csv</span><span class="p">(</span><span 
class="s">&quot;cars&quot;</span><span class="p">,</span><span class="w"> 
</span><span class="o">&amp;</span><span class="n">csv_path</span><span 
class="p">,</span><span class="w"> </span><span 
class="n">CsvReadOptions</span>::<span class="n">default</span><span 
class="p">().</span><span class="n">has_header</span><span 
class="p">(</span><span class="kc">true</span><span cla [...]
+<span class="c1">// do query with smooth_it</span>
+<span class="kd">let</span><span class="w"> </span><span 
class="n">df</span><span class="w"> </span><span class="o">=</span><span 
class="w"> </span><span class="n">ctx</span>
+<span class="w">    </span><span class="p">.</span><span 
class="n">sql</span><span class="p">(</span>
+<span class="w">        </span><span class="s">&quot;SELECT \</span>
+<span class="s">           car, \</span>
+<span class="s">           speed, \</span>
+<span class="s">           smooth_it(speed) OVER (PARTITION BY car ORDER BY 
time) as smooth_speed,\</span>
+<span class="s">           time \</span>
+<span class="s">           from cars \</span>
+<span class="s">         ORDER BY \</span>
+<span class="s">           car&quot;</span><span class="p">,</span>
+<span class="w">    </span><span class="p">)</span>
+<span class="w">    </span><span class="p">.</span><span 
class="k">await</span><span class="o">?</span><span class="p">;</span>
+<span class="c1">// print the results</span>
+<span class="n">df</span><span class="p">.</span><span 
class="n">show</span><span class="p">().</span><span 
class="k">await</span><span class="o">?</span><span class="p">;</span>
+</pre></div>
+</div>
+<p>the output will be like:</p>
+<div class="highlight-csv notranslate"><div 
class="highlight"><pre><span></span>+-------+-------+--------------------+---------------------+
+| car   | speed | smooth_speed       | time                |
++-------+-------+--------------------+---------------------+
+| green | 10.0  | 10.0               | 1996-04-12T12:05:03 |
+| green | 10.3  | 10.15              | 1996-04-12T12:05:04 |
+| green | 10.4  | 10.233333333333334 | 1996-04-12T12:05:05 |
+| green | 10.5  | 10.3               | 1996-04-12T12:05:06 |
+| green | 11.0  | 10.440000000000001 | 1996-04-12T12:05:07 |
+| green | 12.0  | 10.700000000000001 | 1996-04-12T12:05:08 |
+| green | 14.0  | 11.171428571428573 | 1996-04-12T12:05:09 |
+| green | 15.0  | 11.65              | 1996-04-12T12:05:10 |
+| green | 15.1  | 12.033333333333333 | 1996-04-12T12:05:11 |
+| green | 15.2  | 12.35              | 1996-04-12T12:05:12 |
+| green | 8.0   | 11.954545454545455 | 1996-04-12T12:05:13 |
+| green | 2.0   | 11.125             | 1996-04-12T12:05:14 |
+| red   | 20.0  | 20.0               | 1996-04-12T12:05:03 |
+| red   | 20.3  | 20.15              | 1996-04-12T12:05:04 |
+...
+</pre></div>
+</div>
+</section>
 </section>
 <section id="adding-an-aggregate-udf">
 <h2>Adding an Aggregate UDF<a class="headerlink" 
href="#adding-an-aggregate-udf" title="Link to this heading">¶</a></h2>
 <p>Aggregate UDFs are functions that take a group of rows and return a single 
value. These are akin to SQL’s <code class="docutils literal notranslate"><span 
class="pre">SUM</span></code> or <code class="docutils literal 
notranslate"><span class="pre">COUNT</span></code> functions.</p>
-<p>Body coming soon.</p>
+<p>For example, we will declare a single-type, single return type UDAF that 
computes the geometric mean.</p>
+<div class="highlight-rust notranslate"><div 
class="highlight"><pre><span></span><span class="k">use</span><span class="w"> 
</span><span class="n">datafusion</span>::<span class="n">arrow</span>::<span 
class="n">array</span>::<span class="n">ArrayRef</span><span class="p">;</span>
+<span class="k">use</span><span class="w"> </span><span 
class="n">datafusion</span>::<span class="n">scalar</span>::<span 
class="n">ScalarValue</span><span class="p">;</span>
+<span class="k">use</span><span class="w"> </span><span 
class="n">datafusion</span>::<span class="p">{</span><span 
class="n">error</span>::<span class="nb">Result</span><span 
class="p">,</span><span class="w"> </span><span 
class="n">physical_plan</span>::<span class="n">Accumulator</span><span 
class="p">};</span>
+
+<span class="sd">/// A UDAF has state across multiple rows, and thus we 
require a `struct` with that state.</span>
+<span class="cp">#[derive(Debug)]</span>
+<span class="k">struct</span> <span class="nc">GeometricMean</span><span 
class="w"> </span><span class="p">{</span>
+<span class="w">    </span><span class="n">n</span>: <span 
class="kt">u32</span><span class="p">,</span>
+<span class="w">    </span><span class="n">prod</span>: <span 
class="kt">f64</span><span class="p">,</span>
+<span class="p">}</span>
+
+<span class="k">impl</span><span class="w"> </span><span 
class="n">GeometricMean</span><span class="w"> </span><span class="p">{</span>
+<span class="w">    </span><span class="c1">// how the struct is 
initialized</span>
+<span class="w">    </span><span class="k">pub</span><span class="w"> 
</span><span class="k">fn</span> <span class="nf">new</span><span 
class="p">()</span><span class="w"> </span>-&gt; <span 
class="nc">Self</span><span class="w"> </span><span class="p">{</span>
+<span class="w">        </span><span class="n">GeometricMean</span><span 
class="w"> </span><span class="p">{</span><span class="w"> </span><span 
class="n">n</span>: <span class="mi">0</span><span class="p">,</span><span 
class="w"> </span><span class="n">prod</span>: <span class="mf">1.0</span><span 
class="w"> </span><span class="p">}</span>
+<span class="w">    </span><span class="p">}</span>
+<span class="p">}</span>
+
+<span class="c1">// UDAFs are built using the trait `Accumulator`, that offers 
DataFusion the necessary functions</span>
+<span class="c1">// to use them.</span>
+<span class="k">impl</span><span class="w"> </span><span 
class="n">Accumulator</span><span class="w"> </span><span 
class="k">for</span><span class="w"> </span><span 
class="n">GeometricMean</span><span class="w"> </span><span class="p">{</span>
+<span class="w">    </span><span class="c1">// This function serializes our 
state to `ScalarValue`, which DataFusion uses</span>
+<span class="w">    </span><span class="c1">// to pass this state between 
execution stages.</span>
+<span class="w">    </span><span class="c1">// Note that this can be arbitrary 
data.</span>
+<span class="w">    </span><span class="k">fn</span> <span 
class="nf">state</span><span class="p">(</span><span 
class="o">&amp;</span><span class="bp">self</span><span class="p">)</span><span 
class="w"> </span>-&gt; <span class="nb">Result</span><span 
class="o">&lt;</span><span class="nb">Vec</span><span 
class="o">&lt;</span><span class="n">ScalarValue</span><span 
class="o">&gt;&gt;</span><span class="w"> </span><span class="p">{</span>
+<span class="w">        </span><span class="nb">Ok</span><span 
class="p">(</span><span class="fm">vec!</span><span class="p">[</span>
+<span class="w">            </span><span class="n">ScalarValue</span>::<span 
class="n">from</span><span class="p">(</span><span class="bp">self</span><span 
class="p">.</span><span class="n">prod</span><span class="p">),</span>
+<span class="w">            </span><span class="n">ScalarValue</span>::<span 
class="n">from</span><span class="p">(</span><span class="bp">self</span><span 
class="p">.</span><span class="n">n</span><span class="p">),</span>
+<span class="w">        </span><span class="p">])</span>
+<span class="w">    </span><span class="p">}</span>
+
+<span class="w">    </span><span class="c1">// DataFusion expects this 
function to return the final value of this aggregator.</span>
+<span class="w">    </span><span class="c1">// in this case, this is the 
formula of the geometric mean</span>
+<span class="w">    </span><span class="k">fn</span> <span 
class="nf">evaluate</span><span class="p">(</span><span 
class="o">&amp;</span><span class="bp">self</span><span class="p">)</span><span 
class="w"> </span>-&gt; <span class="nb">Result</span><span 
class="o">&lt;</span><span class="n">ScalarValue</span><span 
class="o">&gt;</span><span class="w"> </span><span class="p">{</span>
+<span class="w">        </span><span class="kd">let</span><span class="w"> 
</span><span class="n">value</span><span class="w"> </span><span 
class="o">=</span><span class="w"> </span><span class="bp">self</span><span 
class="p">.</span><span class="n">prod</span><span class="p">.</span><span 
class="n">powf</span><span class="p">(</span><span class="mf">1.0</span><span 
class="w"> </span><span class="o">/</span><span class="w"> </span><span 
class="bp">self</span><span class="p">.</span><span [...]
+<span class="w">        </span><span class="nb">Ok</span><span 
class="p">(</span><span class="n">ScalarValue</span>::<span 
class="n">from</span><span class="p">(</span><span class="n">value</span><span 
class="p">))</span>
+<span class="w">    </span><span class="p">}</span>
+
+<span class="w">    </span><span class="c1">// DataFusion calls this function 
to update the accumulator&#39;s state for a batch</span>
+<span class="w">    </span><span class="c1">// of inputs rows. In this case 
the product is updated with values from the first column</span>
+<span class="w">    </span><span class="c1">// and the count is updated based 
on the row count</span>
+<span class="w">    </span><span class="k">fn</span> <span 
class="nf">update_batch</span><span class="p">(</span><span 
class="o">&amp;</span><span class="k">mut</span><span class="w"> </span><span 
class="bp">self</span><span class="p">,</span><span class="w"> </span><span 
class="n">values</span>: <span class="kp">&amp;</span><span 
class="p">[</span><span class="n">ArrayRef</span><span class="p">])</span><span 
class="w"> </span>-&gt; <span class="nb">Result</span><span class="o">&lt;</spa 
[...]
+<span class="w">        </span><span class="k">if</span><span class="w"> 
</span><span class="n">values</span><span class="p">.</span><span 
class="n">is_empty</span><span class="p">()</span><span class="w"> </span><span 
class="p">{</span>
+<span class="w">            </span><span class="k">return</span><span 
class="w"> </span><span class="nb">Ok</span><span class="p">(());</span>
+<span class="w">        </span><span class="p">}</span>
+<span class="w">        </span><span class="kd">let</span><span class="w"> 
</span><span class="n">arr</span><span class="w"> </span><span 
class="o">=</span><span class="w"> </span><span class="o">&amp;</span><span 
class="n">values</span><span class="p">[</span><span class="mi">0</span><span 
class="p">];</span>
+<span class="w">        </span><span class="p">(</span><span 
class="mi">0</span><span class="o">..</span><span class="n">arr</span><span 
class="p">.</span><span class="n">len</span><span class="p">()).</span><span 
class="n">try_for_each</span><span class="p">(</span><span 
class="o">|</span><span class="n">index</span><span class="o">|</span><span 
class="w"> </span><span class="p">{</span>
+<span class="w">            </span><span class="kd">let</span><span class="w"> 
</span><span class="n">v</span><span class="w"> </span><span 
class="o">=</span><span class="w"> </span><span 
class="n">ScalarValue</span>::<span class="n">try_from_array</span><span 
class="p">(</span><span class="n">arr</span><span class="p">,</span><span 
class="w"> </span><span class="n">index</span><span class="p">)</span><span 
class="o">?</span><span class="p">;</span>
+
+<span class="w">            </span><span class="k">if</span><span class="w"> 
</span><span class="kd">let</span><span class="w"> </span><span 
class="n">ScalarValue</span>::<span class="n">Float64</span><span 
class="p">(</span><span class="nb">Some</span><span class="p">(</span><span 
class="n">value</span><span class="p">))</span><span class="w"> </span><span 
class="o">=</span><span class="w"> </span><span class="n">v</span><span 
class="w"> </span><span class="p">{</span>
+<span class="w">                </span><span class="bp">self</span><span 
class="p">.</span><span class="n">prod</span><span class="w"> </span><span 
class="o">*=</span><span class="w"> </span><span class="n">value</span><span 
class="p">;</span>
+<span class="w">                </span><span class="bp">self</span><span 
class="p">.</span><span class="n">n</span><span class="w"> </span><span 
class="o">+=</span><span class="w"> </span><span class="mi">1</span><span 
class="p">;</span>
+<span class="w">            </span><span class="p">}</span><span class="w"> 
</span><span class="k">else</span><span class="w"> </span><span 
class="p">{</span>
+<span class="w">                </span><span 
class="fm">unreachable!</span><span class="p">(</span><span 
class="s">&quot;&quot;</span><span class="p">)</span>
+<span class="w">            </span><span class="p">}</span>
+<span class="w">            </span><span class="nb">Ok</span><span 
class="p">(())</span>
+<span class="w">        </span><span class="p">})</span>
+<span class="w">    </span><span class="p">}</span>
+
+<span class="w">    </span><span class="c1">// Optimization hint: this trait 
also supports `update_batch` and `merge_batch`,</span>
+<span class="w">    </span><span class="c1">// that can be used to perform 
these operations on arrays instead of single values.</span>
+<span class="w">    </span><span class="k">fn</span> <span 
class="nf">merge_batch</span><span class="p">(</span><span 
class="o">&amp;</span><span class="k">mut</span><span class="w"> </span><span 
class="bp">self</span><span class="p">,</span><span class="w"> </span><span 
class="n">states</span>: <span class="kp">&amp;</span><span 
class="p">[</span><span class="n">ArrayRef</span><span class="p">])</span><span 
class="w"> </span>-&gt; <span class="nb">Result</span><span 
class="o">&lt;</span [...]
+<span class="w">        </span><span class="k">if</span><span class="w"> 
</span><span class="n">states</span><span class="p">.</span><span 
class="n">is_empty</span><span class="p">()</span><span class="w"> </span><span 
class="p">{</span>
+<span class="w">            </span><span class="k">return</span><span 
class="w"> </span><span class="nb">Ok</span><span class="p">(());</span>
+<span class="w">        </span><span class="p">}</span>
+<span class="w">        </span><span class="kd">let</span><span class="w"> 
</span><span class="n">arr</span><span class="w"> </span><span 
class="o">=</span><span class="w"> </span><span class="o">&amp;</span><span 
class="n">states</span><span class="p">[</span><span class="mi">0</span><span 
class="p">];</span>
+<span class="w">        </span><span class="p">(</span><span 
class="mi">0</span><span class="o">..</span><span class="n">arr</span><span 
class="p">.</span><span class="n">len</span><span class="p">()).</span><span 
class="n">try_for_each</span><span class="p">(</span><span 
class="o">|</span><span class="n">index</span><span class="o">|</span><span 
class="w"> </span><span class="p">{</span>
+<span class="w">            </span><span class="kd">let</span><span class="w"> 
</span><span class="n">v</span><span class="w"> </span><span 
class="o">=</span><span class="w"> </span><span class="n">states</span>
+<span class="w">                </span><span class="p">.</span><span 
class="n">iter</span><span class="p">()</span>
+<span class="w">                </span><span class="p">.</span><span 
class="n">map</span><span class="p">(</span><span class="o">|</span><span 
class="n">array</span><span class="o">|</span><span class="w"> </span><span 
class="n">ScalarValue</span>::<span class="n">try_from_array</span><span 
class="p">(</span><span class="n">array</span><span class="p">,</span><span 
class="w"> </span><span class="n">index</span><span class="p">))</span>
+<span class="w">                </span><span class="p">.</span><span 
class="n">collect</span>::<span class="o">&lt;</span><span 
class="nb">Result</span><span class="o">&lt;</span><span 
class="nb">Vec</span><span class="o">&lt;</span><span class="n">_</span><span 
class="o">&gt;&gt;&gt;</span><span class="p">()</span><span 
class="o">?</span><span class="p">;</span>
+<span class="w">            </span><span class="k">if</span><span class="w"> 
</span><span class="kd">let</span><span class="w"> </span><span 
class="p">(</span><span class="n">ScalarValue</span>::<span 
class="n">Float64</span><span class="p">(</span><span 
class="nb">Some</span><span class="p">(</span><span class="n">prod</span><span 
class="p">)),</span><span class="w"> </span><span 
class="n">ScalarValue</span>::<span class="n">UInt32</span><span 
class="p">(</span><span class="nb">Some</sp [...]
+<span class="w">            </span><span class="p">{</span>
+<span class="w">                </span><span class="bp">self</span><span 
class="p">.</span><span class="n">prod</span><span class="w"> </span><span 
class="o">*=</span><span class="w"> </span><span class="n">prod</span><span 
class="p">;</span>
+<span class="w">                </span><span class="bp">self</span><span 
class="p">.</span><span class="n">n</span><span class="w"> </span><span 
class="o">+=</span><span class="w"> </span><span class="n">n</span><span 
class="p">;</span>
+<span class="w">            </span><span class="p">}</span><span class="w"> 
</span><span class="k">else</span><span class="w"> </span><span 
class="p">{</span>
+<span class="w">                </span><span 
class="fm">unreachable!</span><span class="p">(</span><span 
class="s">&quot;&quot;</span><span class="p">)</span>
+<span class="w">            </span><span class="p">}</span>
+<span class="w">            </span><span class="nb">Ok</span><span 
class="p">(())</span>
+<span class="w">        </span><span class="p">})</span>
+<span class="w">    </span><span class="p">}</span>
+
+<span class="w">    </span><span class="k">fn</span> <span 
class="nf">size</span><span class="p">(</span><span class="o">&amp;</span><span 
class="bp">self</span><span class="p">)</span><span class="w"> </span>-&gt; 
<span class="kt">usize</span> <span class="p">{</span>
+<span class="w">        </span><span class="n">std</span>::<span 
class="n">mem</span>::<span class="n">size_of_val</span><span 
class="p">(</span><span class="bp">self</span><span class="p">)</span>
+<span class="w">    </span><span class="p">}</span>
+<span class="p">}</span>
+</pre></div>
+</div>
+<section id="registering-an-aggregate-udf">
+<h3>registering an Aggregate UDF<a class="headerlink" 
href="#registering-an-aggregate-udf" title="Link to this heading">¶</a></h3>
+<p>To register a Aggreate UDF, you need to wrap the function implementation in 
a <code class="docutils literal notranslate"><span 
class="pre">AggregateUDF</span></code> struct and then register it with the 
<code class="docutils literal notranslate"><span 
class="pre">SessionContext</span></code>. DataFusion provides the <code 
class="docutils literal notranslate"><span 
class="pre">create_udaf</span></code> helper functions to make this easier.</p>
+<div class="highlight-rust notranslate"><div 
class="highlight"><pre><span></span><span class="k">use</span><span class="w"> 
</span><span class="n">datafusion</span>::<span 
class="n">logical_expr</span>::<span class="p">{</span><span 
class="n">Volatility</span><span class="p">,</span><span class="w"> 
</span><span class="n">create_udaf</span><span class="p">};</span>
+<span class="k">use</span><span class="w"> </span><span 
class="n">datafusion</span>::<span class="n">arrow</span>::<span 
class="n">datatypes</span>::<span class="n">DataType</span><span 
class="p">;</span>
+<span class="k">use</span><span class="w"> </span><span 
class="n">std</span>::<span class="n">sync</span>::<span 
class="n">Arc</span><span class="p">;</span>
+
+<span class="c1">// here is where we define the UDAF. We also declare its 
signature:</span>
+<span class="kd">let</span><span class="w"> </span><span 
class="n">geometric_mean</span><span class="w"> </span><span 
class="o">=</span><span class="w"> </span><span 
class="n">create_udaf</span><span class="p">(</span>
+<span class="w">    </span><span class="c1">// the name; used to represent it 
in plan descriptions and in the registry, to use in SQL.</span>
+<span class="w">    </span><span class="s">&quot;geo_mean&quot;</span><span 
class="p">,</span>
+<span class="w">    </span><span class="c1">// the input type; DataFusion 
guarantees that the first entry of `values` in `update` has this type.</span>
+<span class="w">    </span><span class="fm">vec!</span><span 
class="p">[</span><span class="n">DataType</span>::<span 
class="n">Float64</span><span class="p">],</span>
+<span class="w">    </span><span class="c1">// the return type; DataFusion 
expects this to match the type returned by `evaluate`.</span>
+<span class="w">    </span><span class="n">Arc</span>::<span 
class="n">new</span><span class="p">(</span><span 
class="n">DataType</span>::<span class="n">Float64</span><span 
class="p">),</span>
+<span class="w">    </span><span class="n">Volatility</span>::<span 
class="n">Immutable</span><span class="p">,</span>
+<span class="w">    </span><span class="c1">// This is the accumulator 
factory; DataFusion uses it to create new accumulators.</span>
+<span class="w">    </span><span class="n">Arc</span>::<span 
class="n">new</span><span class="p">(</span><span class="o">|</span><span 
class="n">_</span><span class="o">|</span><span class="w"> </span><span 
class="nb">Ok</span><span class="p">(</span><span class="nb">Box</span>::<span 
class="n">new</span><span class="p">(</span><span 
class="n">GeometricMean</span>::<span class="n">new</span><span 
class="p">()))),</span>
+<span class="w">    </span><span class="c1">// This is the description of the 
state. `state()` must match the types here.</span>
+<span class="w">    </span><span class="n">Arc</span>::<span 
class="n">new</span><span class="p">(</span><span class="fm">vec!</span><span 
class="p">[</span><span class="n">DataType</span>::<span 
class="n">Float64</span><span class="p">,</span><span class="w"> </span><span 
class="n">DataType</span>::<span class="n">UInt32</span><span 
class="p">]),</span>
+<span class="p">);</span>
+</pre></div>
+</div>
+<p>The <code class="docutils literal notranslate"><span 
class="pre">create_udaf</span></code> has six arguments to check:</p>
+<ul class="simple">
+<li><p>The first argument is the name of the function. This is the name that 
will be used in SQL queries.</p></li>
+<li><p>The second argument is a vector of <code class="docutils literal 
notranslate"><span class="pre">DataType</span></code>s. This is the list of 
argument types that the function accepts. I.e. in this case, the function 
accepts a single <code class="docutils literal notranslate"><span 
class="pre">Float64</span></code> argument.</p></li>
+<li><p>The third argument is the return type of the function. I.e. in this 
case, the function returns an <code class="docutils literal notranslate"><span 
class="pre">Int64</span></code>.</p></li>
+<li><p>The fourth argument is the volatility of the function. In short, this 
is used to determine if the function’s performance can be optimized in some 
situations. In this case, the function is <code class="docutils literal 
notranslate"><span class="pre">Immutable</span></code> because it always 
returns the same value for the same input. A random number generator would be 
<code class="docutils literal notranslate"><span 
class="pre">Volatile</span></code> because it returns a different v [...]
+<li><p>The fifth argument is the function implementation. This is the function 
that we defined above.</p></li>
+<li><p>The sixth argument is the description of the state, which will by 
passed between execution stages.</p></li>
+</ul>
+<p>That gives us a <code class="docutils literal notranslate"><span 
class="pre">AggregateUDF</span></code> that we can register with the <code 
class="docutils literal notranslate"><span 
class="pre">SessionContext</span></code>:</p>
+<div class="highlight-rust notranslate"><div 
class="highlight"><pre><span></span><span class="k">use</span><span class="w"> 
</span><span class="n">datafusion</span>::<span 
class="n">execution</span>::<span class="n">context</span>::<span 
class="n">SessionContext</span><span class="p">;</span>
+
+<span class="kd">let</span><span class="w"> </span><span 
class="n">ctx</span><span class="w"> </span><span class="o">=</span><span 
class="w"> </span><span class="n">SessionContext</span>::<span 
class="n">new</span><span class="p">();</span>
+
+<span class="n">ctx</span><span class="p">.</span><span 
class="n">register_udaf</span><span class="p">(</span><span 
class="n">geometric_mean</span><span class="p">);</span>
+</pre></div>
+</div>
+<p>Then, we can query like below:</p>
+<div class="highlight-rust notranslate"><div 
class="highlight"><pre><span></span><span class="kd">let</span><span class="w"> 
</span><span class="n">df</span><span class="w"> </span><span 
class="o">=</span><span class="w"> </span><span class="n">ctx</span><span 
class="p">.</span><span class="n">sql</span><span class="p">(</span><span 
class="s">&quot;SELECT geo_mean(a) FROM t&quot;</span><span 
class="p">).</span><span class="k">await</span><span class="o">?</span><span 
class="p">;</span>
+</pre></div>
+</div>
+</section>
 </section>
 </section>
 
diff --git a/searchindex.js b/searchindex.js
index 7843a30d39..a9dd80744c 100644
--- a/searchindex.js
+++ b/searchindex.js
@@ -1 +1 @@
-Search.setIndex({"docnames": ["contributor-guide/architecture", 
"contributor-guide/communication", "contributor-guide/index", 
"contributor-guide/quarterly_roadmap", "contributor-guide/roadmap", 
"contributor-guide/specification/index", 
"contributor-guide/specification/invariants", 
"contributor-guide/specification/output-field-name-semantic", "index", 
"library-user-guide/adding-udfs", "library-user-guide/building-logical-plans", 
"library-user-guide/catalogs", "library-user-guide/custom-tab [...]
\ No newline at end of file
+Search.setIndex({"docnames": ["contributor-guide/architecture", 
"contributor-guide/communication", "contributor-guide/index", 
"contributor-guide/quarterly_roadmap", "contributor-guide/roadmap", 
"contributor-guide/specification/index", 
"contributor-guide/specification/invariants", 
"contributor-guide/specification/output-field-name-semantic", "index", 
"library-user-guide/adding-udfs", "library-user-guide/building-logical-plans", 
"library-user-guide/catalogs", "library-user-guide/custom-tab [...]
\ No newline at end of file

Reply via email to