This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 5d3ed9c2e1 feat: Add Aggregate UDF to FFI crate (#14775)
5d3ed9c2e1 is described below

commit 5d3ed9c2e193213c222d933dea7c8bb75ea8b5e8
Author: Tim Saucer <timsau...@gmail.com>
AuthorDate: Thu Jun 5 15:58:51 2025 -0400

    feat: Add Aggregate UDF to FFI crate (#14775)
    
    * Work in progress adding user defined aggregate function FFI support
    
    * Intermediate work. Going through groups accumulator
    
    * MVP for aggregate udf via FFI
    
    * Clean up after rebase
    
    * Add unit test for FFI Accumulator Args
    
    * Adding unit tests and fixing memory errors in aggregate ffi udf
    
    * Working through additional unit and integration tests for UDAF ffi
    
    * Switch to a accumulator that supports convert to state to get a little 
better coverage
    
    * Set feature so we do not get an error warning in stable rustc
    
    * Add more options to test
    
    * Add unit test for FFI RecordBatchStream
    
    * Add a few more args to ffi accumulator test fn
    
    * Adding more unit tests on ffi aggregate udaf
    
    * taplo format
    
    * Update code comment
    
    * Correct function name
    
    * Temp fix record batch test dependencies
    
    * Address some comments
    
    * Revise comments and address PR comments
    
    * Remove commented code
    
    * Refactor GroupsAccumulator
    
    * Add documentation
    
    * Split integration tests
    
    * Address comments to refactor error handling for opt filter
    
    * Fix linting errors
    
    * Fix linting and add deref
    
    * Remove extra tests and unnecessary code
    
    * Adjustments to FFI aggregate functions after rebase on main
    
    * cargo fmt
    
    * cargo clippy
    
    * Re-implement cleaned up code that was removed in last push
    
    * Minor review comments
    
    ---------
    
    Co-authored-by: Crystal Zhou <crystal.zhouxiao...@hotmail.com>
    Co-authored-by: Andrew Lamb <and...@nerdnetworks.org>
---
 Cargo.lock                                         |   2 +
 Cargo.toml                                         |   2 +-
 .../core/src/datasource/file_format/parquet.rs     |  49 +-
 datafusion/core/src/test_util/mod.rs               |  47 ++
 datafusion/expr-common/src/groups_accumulator.rs   |   2 +-
 datafusion/ffi/Cargo.toml                          |   3 +
 datafusion/ffi/src/arrow_wrappers.rs               |  33 +-
 datafusion/ffi/src/lib.rs                          |   1 +
 datafusion/ffi/src/plan_properties.rs              |  14 +-
 datafusion/ffi/src/record_batch_stream.rs          |  46 ++
 datafusion/ffi/src/tests/mod.rs                    |  15 +-
 datafusion/ffi/src/tests/udf_udaf_udwf.rs          |  17 +-
 datafusion/ffi/src/udaf/accumulator.rs             | 366 ++++++++++
 datafusion/ffi/src/udaf/accumulator_args.rs        | 198 ++++++
 datafusion/ffi/src/udaf/groups_accumulator.rs      | 513 ++++++++++++++
 datafusion/ffi/src/udaf/mod.rs                     | 733 +++++++++++++++++++++
 datafusion/ffi/src/util.rs                         |   2 +-
 datafusion/ffi/tests/ffi_integration.rs            |   1 -
 datafusion/ffi/tests/ffi_udaf.rs                   | 129 ++++
 19 files changed, 2110 insertions(+), 63 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index 7fa226df1f..77f0e8d1e3 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -2242,7 +2242,9 @@ dependencies = [
  "async-ffi",
  "async-trait",
  "datafusion",
+ "datafusion-functions-aggregate-common",
  "datafusion-proto",
+ "datafusion-proto-common",
  "doc-comment",
  "futures",
  "log",
diff --git a/Cargo.toml b/Cargo.toml
index 767b66805f..64483eeb93 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -216,5 +216,5 @@ unnecessary_lazy_evaluations = "warn"
 uninlined_format_args = "warn"
 
 [workspace.lints.rust]
-unexpected_cfgs = { level = "warn", check-cfg = ["cfg(tarpaulin)"] }
+unexpected_cfgs = { level = "warn", check-cfg = ["cfg(tarpaulin)", 
"cfg(tarpaulin_include)"] }
 unused_qualifications = "deny"
diff --git a/datafusion/core/src/datasource/file_format/parquet.rs 
b/datafusion/core/src/datasource/file_format/parquet.rs
index 9705225c24..6a5c19829c 100644
--- a/datafusion/core/src/datasource/file_format/parquet.rs
+++ b/datafusion/core/src/datasource/file_format/parquet.rs
@@ -107,10 +107,8 @@ pub(crate) mod test_util {
 mod tests {
 
     use std::fmt::{self, Display, Formatter};
-    use std::pin::Pin;
     use std::sync::atomic::{AtomicUsize, Ordering};
     use std::sync::Arc;
-    use std::task::{Context, Poll};
     use std::time::Duration;
 
     use crate::datasource::file_format::parquet::test_util::store_parquet;
@@ -120,7 +118,7 @@ mod tests {
     use crate::prelude::{ParquetReadOptions, SessionConfig, SessionContext};
 
     use arrow::array::RecordBatch;
-    use arrow_schema::{Schema, SchemaRef};
+    use arrow_schema::Schema;
     use datafusion_catalog::Session;
     use datafusion_common::cast::{
         as_binary_array, as_binary_view_array, as_boolean_array, 
as_float32_array,
@@ -140,7 +138,7 @@ mod tests {
     };
     use datafusion_execution::object_store::ObjectStoreUrl;
     use datafusion_execution::runtime_env::RuntimeEnv;
-    use datafusion_execution::{RecordBatchStream, TaskContext};
+    use datafusion_execution::TaskContext;
     use datafusion_expr::dml::InsertOp;
     use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
     use datafusion_physical_plan::{collect, ExecutionPlan};
@@ -153,7 +151,7 @@ mod tests {
     use async_trait::async_trait;
     use datafusion_datasource::file_groups::FileGroup;
     use futures::stream::BoxStream;
-    use futures::{Stream, StreamExt};
+    use futures::StreamExt;
     use insta::assert_snapshot;
     use log::error;
     use object_store::local::LocalFileSystem;
@@ -169,6 +167,8 @@ mod tests {
     use parquet::format::FileMetaData;
     use tokio::fs::File;
 
+    use crate::test_util::bounded_stream;
+
     enum ForceViews {
         Yes,
         No,
@@ -1662,43 +1662,4 @@ mod tests {
 
         Ok(())
     }
-
-    /// Creates an bounded stream for testing purposes.
-    fn bounded_stream(
-        batch: RecordBatch,
-        limit: usize,
-    ) -> datafusion_execution::SendableRecordBatchStream {
-        Box::pin(BoundedStream {
-            count: 0,
-            limit,
-            batch,
-        })
-    }
-
-    struct BoundedStream {
-        limit: usize,
-        count: usize,
-        batch: RecordBatch,
-    }
-
-    impl Stream for BoundedStream {
-        type Item = Result<RecordBatch>;
-
-        fn poll_next(
-            mut self: Pin<&mut Self>,
-            _cx: &mut Context<'_>,
-        ) -> Poll<Option<Self::Item>> {
-            if self.count >= self.limit {
-                return Poll::Ready(None);
-            }
-            self.count += 1;
-            Poll::Ready(Some(Ok(self.batch.clone())))
-        }
-    }
-
-    impl RecordBatchStream for BoundedStream {
-        fn schema(&self) -> SchemaRef {
-            self.batch.schema()
-        }
-    }
 }
diff --git a/datafusion/core/src/test_util/mod.rs 
b/datafusion/core/src/test_util/mod.rs
index d6865ca3d5..2f8e66a2bb 100644
--- a/datafusion/core/src/test_util/mod.rs
+++ b/datafusion/core/src/test_util/mod.rs
@@ -22,12 +22,14 @@ pub mod parquet;
 
 pub mod csv;
 
+use futures::Stream;
 use std::any::Any;
 use std::collections::HashMap;
 use std::fs::File;
 use std::io::Write;
 use std::path::Path;
 use std::sync::Arc;
+use std::task::{Context, Poll};
 
 use crate::catalog::{TableProvider, TableProviderFactory};
 use crate::dataframe::DataFrame;
@@ -38,11 +40,13 @@ use crate::logical_expr::{LogicalPlanBuilder, 
UNNAMED_TABLE};
 use crate::physical_plan::ExecutionPlan;
 use crate::prelude::{CsvReadOptions, SessionContext};
 
+use crate::execution::SendableRecordBatchStream;
 use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
 use arrow::record_batch::RecordBatch;
 use datafusion_catalog::Session;
 use datafusion_common::TableReference;
 use datafusion_expr::{CreateExternalTable, Expr, SortExpr, TableType};
+use std::pin::Pin;
 
 use async_trait::async_trait;
 
@@ -52,6 +56,8 @@ use tempfile::TempDir;
 pub use datafusion_common::test_util::parquet_test_data;
 pub use datafusion_common::test_util::{arrow_test_data, get_data_dir};
 
+use crate::execution::RecordBatchStream;
+
 /// Scan an empty data source, mainly used in tests
 pub fn scan_empty(
     name: Option<&str>,
@@ -234,3 +240,44 @@ pub fn register_unbounded_file_with_ordering(
     ctx.register_table(table_name, 
Arc::new(StreamTable::new(Arc::new(config))))?;
     Ok(())
 }
+
+/// Creates a bounded stream that emits the same record batch a specified 
number of times.
+/// This is useful for testing purposes.
+pub fn bounded_stream(
+    record_batch: RecordBatch,
+    limit: usize,
+) -> SendableRecordBatchStream {
+    Box::pin(BoundedStream {
+        record_batch,
+        count: 0,
+        limit,
+    })
+}
+
+struct BoundedStream {
+    record_batch: RecordBatch,
+    count: usize,
+    limit: usize,
+}
+
+impl Stream for BoundedStream {
+    type Item = Result<RecordBatch, crate::error::DataFusionError>;
+
+    fn poll_next(
+        mut self: Pin<&mut Self>,
+        _cx: &mut Context<'_>,
+    ) -> Poll<Option<Self::Item>> {
+        if self.count >= self.limit {
+            Poll::Ready(None)
+        } else {
+            self.count += 1;
+            Poll::Ready(Some(Ok(self.record_batch.clone())))
+        }
+    }
+}
+
+impl RecordBatchStream for BoundedStream {
+    fn schema(&self) -> SchemaRef {
+        self.record_batch.schema()
+    }
+}
diff --git a/datafusion/expr-common/src/groups_accumulator.rs 
b/datafusion/expr-common/src/groups_accumulator.rs
index 5ff1c1d072..9bcc1edff8 100644
--- a/datafusion/expr-common/src/groups_accumulator.rs
+++ b/datafusion/expr-common/src/groups_accumulator.rs
@@ -21,7 +21,7 @@ use arrow::array::{ArrayRef, BooleanArray};
 use datafusion_common::{not_impl_err, Result};
 
 /// Describes how many rows should be emitted during grouping.
-#[derive(Debug, Clone, Copy)]
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
 pub enum EmitTo {
     /// Emit all groups
     All,
diff --git a/datafusion/ffi/Cargo.toml b/datafusion/ffi/Cargo.toml
index 29f40df514..a8335769ec 100644
--- a/datafusion/ffi/Cargo.toml
+++ b/datafusion/ffi/Cargo.toml
@@ -44,7 +44,9 @@ arrow-schema = { workspace = true }
 async-ffi = { version = "0.5.0", features = ["abi_stable"] }
 async-trait = { workspace = true }
 datafusion = { workspace = true, default-features = false }
+datafusion-functions-aggregate-common = { workspace = true }
 datafusion-proto = { workspace = true }
+datafusion-proto-common = { workspace = true }
 futures = { workspace = true }
 log = { workspace = true }
 prost = { workspace = true }
@@ -56,3 +58,4 @@ doc-comment = { workspace = true }
 
 [features]
 integration-tests = []
+tarpaulin_include = [] # Exists only to prevent warnings on stable and still 
have accurate coverage
diff --git a/datafusion/ffi/src/arrow_wrappers.rs 
b/datafusion/ffi/src/arrow_wrappers.rs
index eb1f34b3d9..7b3751dcae 100644
--- a/datafusion/ffi/src/arrow_wrappers.rs
+++ b/datafusion/ffi/src/arrow_wrappers.rs
@@ -21,7 +21,8 @@ use abi_stable::StableAbi;
 use arrow::{
     array::{make_array, ArrayRef},
     datatypes::{Schema, SchemaRef},
-    ffi::{from_ffi, FFI_ArrowArray, FFI_ArrowSchema},
+    error::ArrowError,
+    ffi::{from_ffi, to_ffi, FFI_ArrowArray, FFI_ArrowSchema},
 };
 use log::error;
 
@@ -44,16 +45,19 @@ impl From<SchemaRef> for WrappedSchema {
         WrappedSchema(ffi_schema)
     }
 }
+/// Some functions are expected to always succeed, like getting the schema 
from a TableProvider.
+/// Since going through the FFI always has the potential to fail, we need to 
catch these errors,
+/// give the user a warning, and return some kind of result. In this case we 
default to an
+/// empty schema.
+#[cfg(not(tarpaulin_include))]
+fn catch_df_schema_error(e: ArrowError) -> Schema {
+    error!("Unable to convert from FFI_ArrowSchema to DataFusion Schema in 
FFI_PlanProperties. {e}");
+    Schema::empty()
+}
 
 impl From<WrappedSchema> for SchemaRef {
     fn from(value: WrappedSchema) -> Self {
-        let schema = match Schema::try_from(&value.0) {
-            Ok(s) => s,
-            Err(e) => {
-                error!("Unable to convert from FFI_ArrowSchema to DataFusion 
Schema in FFI_PlanProperties. {e}");
-                Schema::empty()
-            }
-        };
+        let schema = 
Schema::try_from(&value.0).unwrap_or_else(catch_df_schema_error);
         Arc::new(schema)
     }
 }
@@ -71,7 +75,7 @@ pub struct WrappedArray {
 }
 
 impl TryFrom<WrappedArray> for ArrayRef {
-    type Error = arrow::error::ArrowError;
+    type Error = ArrowError;
 
     fn try_from(value: WrappedArray) -> Result<Self, Self::Error> {
         let data = unsafe { from_ffi(value.array, &value.schema.0)? };
@@ -79,3 +83,14 @@ impl TryFrom<WrappedArray> for ArrayRef {
         Ok(make_array(data))
     }
 }
+
+impl TryFrom<&ArrayRef> for WrappedArray {
+    type Error = ArrowError;
+
+    fn try_from(array: &ArrayRef) -> Result<Self, Self::Error> {
+        let (array, schema) = to_ffi(&array.to_data())?;
+        let schema = WrappedSchema(schema);
+
+        Ok(WrappedArray { array, schema })
+    }
+}
diff --git a/datafusion/ffi/src/lib.rs b/datafusion/ffi/src/lib.rs
index d877e182a1..755c460f31 100644
--- a/datafusion/ffi/src/lib.rs
+++ b/datafusion/ffi/src/lib.rs
@@ -34,6 +34,7 @@ pub mod schema_provider;
 pub mod session_config;
 pub mod table_provider;
 pub mod table_source;
+pub mod udaf;
 pub mod udf;
 pub mod udtf;
 pub mod util;
diff --git a/datafusion/ffi/src/plan_properties.rs 
b/datafusion/ffi/src/plan_properties.rs
index 5c878fa4be..587e667a47 100644
--- a/datafusion/ffi/src/plan_properties.rs
+++ b/datafusion/ffi/src/plan_properties.rs
@@ -300,7 +300,10 @@ impl From<FFI_EmissionType> for EmissionType {
 
 #[cfg(test)]
 mod tests {
-    use datafusion::physical_plan::Partitioning;
+    use datafusion::{
+        physical_expr::{LexOrdering, PhysicalSortExpr},
+        physical_plan::Partitioning,
+    };
 
     use super::*;
 
@@ -311,8 +314,13 @@ mod tests {
             Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, 
false)]));
 
         let original_props = PlanProperties::new(
-            EquivalenceProperties::new(schema),
-            Partitioning::UnknownPartitioning(3),
+            EquivalenceProperties::new(Arc::clone(&schema)).with_reorder(
+                LexOrdering::new(vec![PhysicalSortExpr {
+                    expr: datafusion::physical_plan::expressions::col("a", 
&schema)?,
+                    options: Default::default(),
+                }]),
+            ),
+            Partitioning::RoundRobinBatch(3),
             EmissionType::Incremental,
             Boundedness::Bounded,
         );
diff --git a/datafusion/ffi/src/record_batch_stream.rs 
b/datafusion/ffi/src/record_batch_stream.rs
index 939c405002..78d65a816f 100644
--- a/datafusion/ffi/src/record_batch_stream.rs
+++ b/datafusion/ffi/src/record_batch_stream.rs
@@ -196,3 +196,49 @@ impl Stream for FFI_RecordBatchStream {
         }
     }
 }
+
+#[cfg(test)]
+mod tests {
+    use std::sync::Arc;
+
+    use arrow::datatypes::{DataType, Field, Schema};
+    use datafusion::{
+        common::record_batch, error::Result, 
execution::SendableRecordBatchStream,
+        test_util::bounded_stream,
+    };
+
+    use super::FFI_RecordBatchStream;
+    use futures::StreamExt;
+
+    #[tokio::test]
+    async fn test_round_trip_record_batch_stream() -> Result<()> {
+        let record_batch = record_batch!(
+            ("a", Int32, vec![1, 2, 3]),
+            ("b", Float64, vec![Some(4.0), None, Some(5.0)])
+        )?;
+        let original_rbs = bounded_stream(record_batch.clone(), 1);
+
+        let ffi_rbs: FFI_RecordBatchStream = original_rbs.into();
+        let mut ffi_rbs: SendableRecordBatchStream = Box::pin(ffi_rbs);
+
+        let schema = ffi_rbs.schema();
+        assert_eq!(
+            schema,
+            Arc::new(Schema::new(vec![
+                Field::new("a", DataType::Int32, true),
+                Field::new("b", DataType::Float64, true)
+            ]))
+        );
+
+        let batch = ffi_rbs.next().await;
+        assert!(batch.is_some());
+        assert!(batch.as_ref().unwrap().is_ok());
+        assert_eq!(batch.unwrap().unwrap(), record_batch);
+
+        // There should only be one batch
+        let no_batch = ffi_rbs.next().await;
+        assert!(no_batch.is_none());
+
+        Ok(())
+    }
+}
diff --git a/datafusion/ffi/src/tests/mod.rs b/datafusion/ffi/src/tests/mod.rs
index 7a36ee52bd..f65ed7441b 100644
--- a/datafusion/ffi/src/tests/mod.rs
+++ b/datafusion/ffi/src/tests/mod.rs
@@ -29,6 +29,8 @@ use catalog::create_catalog_provider;
 
 use crate::{catalog_provider::FFI_CatalogProvider, udtf::FFI_TableFunction};
 
+use crate::udaf::FFI_AggregateUDF;
+
 use super::{table_provider::FFI_TableProvider, udf::FFI_ScalarUDF};
 use arrow::array::RecordBatch;
 use async_provider::create_async_table_provider;
@@ -37,7 +39,10 @@ use datafusion::{
     common::record_batch,
 };
 use sync_provider::create_sync_table_provider;
-use udf_udaf_udwf::{create_ffi_abs_func, create_ffi_random_func, 
create_ffi_table_func};
+use udf_udaf_udwf::{
+    create_ffi_abs_func, create_ffi_random_func, create_ffi_stddev_func,
+    create_ffi_sum_func, create_ffi_table_func,
+};
 
 mod async_provider;
 pub mod catalog;
@@ -65,6 +70,12 @@ pub struct ForeignLibraryModule {
 
     pub create_table_function: extern "C" fn() -> FFI_TableFunction,
 
+    /// Create an aggregate UDAF using sum
+    pub create_sum_udaf: extern "C" fn() -> FFI_AggregateUDF,
+
+    /// Createa  grouping UDAF using stddev
+    pub create_stddev_udaf: extern "C" fn() -> FFI_AggregateUDF,
+
     pub version: extern "C" fn() -> u64,
 }
 
@@ -112,6 +123,8 @@ pub fn get_foreign_library_module() -> 
ForeignLibraryModuleRef {
         create_scalar_udf: create_ffi_abs_func,
         create_nullary_udf: create_ffi_random_func,
         create_table_function: create_ffi_table_func,
+        create_sum_udaf: create_ffi_sum_func,
+        create_stddev_udaf: create_ffi_stddev_func,
         version: super::version,
     }
     .leak_into_prefix()
diff --git a/datafusion/ffi/src/tests/udf_udaf_udwf.rs 
b/datafusion/ffi/src/tests/udf_udaf_udwf.rs
index c3cb1bcc35..6aa69bdd0c 100644
--- a/datafusion/ffi/src/tests/udf_udaf_udwf.rs
+++ b/datafusion/ffi/src/tests/udf_udaf_udwf.rs
@@ -15,12 +15,13 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use crate::{udf::FFI_ScalarUDF, udtf::FFI_TableFunction};
+use crate::{udaf::FFI_AggregateUDF, udf::FFI_ScalarUDF, 
udtf::FFI_TableFunction};
 use datafusion::{
     catalog::TableFunctionImpl,
     functions::math::{abs::AbsFunc, random::RandomFunc},
+    functions_aggregate::{stddev::Stddev, sum::Sum},
     functions_table::generate_series::RangeFunc,
-    logical_expr::ScalarUDF,
+    logical_expr::{AggregateUDF, ScalarUDF},
 };
 
 use std::sync::Arc;
@@ -42,3 +43,15 @@ pub(crate) extern "C" fn create_ffi_table_func() -> 
FFI_TableFunction {
 
     FFI_TableFunction::new(udtf, None)
 }
+
+pub(crate) extern "C" fn create_ffi_sum_func() -> FFI_AggregateUDF {
+    let udaf: Arc<AggregateUDF> = Arc::new(Sum::new().into());
+
+    udaf.into()
+}
+
+pub(crate) extern "C" fn create_ffi_stddev_func() -> FFI_AggregateUDF {
+    let udaf: Arc<AggregateUDF> = Arc::new(Stddev::new().into());
+
+    udaf.into()
+}
diff --git a/datafusion/ffi/src/udaf/accumulator.rs 
b/datafusion/ffi/src/udaf/accumulator.rs
new file mode 100644
index 0000000000..80b872159f
--- /dev/null
+++ b/datafusion/ffi/src/udaf/accumulator.rs
@@ -0,0 +1,366 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{ffi::c_void, ops::Deref};
+
+use abi_stable::{
+    std_types::{RResult, RString, RVec},
+    StableAbi,
+};
+use arrow::{array::ArrayRef, error::ArrowError};
+use datafusion::{
+    error::{DataFusionError, Result},
+    logical_expr::Accumulator,
+    scalar::ScalarValue,
+};
+use prost::Message;
+
+use crate::{arrow_wrappers::WrappedArray, df_result, rresult, rresult_return};
+
+/// A stable struct for sharing [`Accumulator`] across FFI boundaries.
+/// For an explanation of each field, see the corresponding function
+/// defined in [`Accumulator`].
+#[repr(C)]
+#[derive(Debug, StableAbi)]
+#[allow(non_camel_case_types)]
+pub struct FFI_Accumulator {
+    pub update_batch: unsafe extern "C" fn(
+        accumulator: &mut Self,
+        values: RVec<WrappedArray>,
+    ) -> RResult<(), RString>,
+
+    // Evaluate and return a ScalarValues as protobuf bytes
+    pub evaluate:
+        unsafe extern "C" fn(accumulator: &mut Self) -> RResult<RVec<u8>, 
RString>,
+
+    pub size: unsafe extern "C" fn(accumulator: &Self) -> usize,
+
+    pub state:
+        unsafe extern "C" fn(accumulator: &mut Self) -> 
RResult<RVec<RVec<u8>>, RString>,
+
+    pub merge_batch: unsafe extern "C" fn(
+        accumulator: &mut Self,
+        states: RVec<WrappedArray>,
+    ) -> RResult<(), RString>,
+
+    pub retract_batch: unsafe extern "C" fn(
+        accumulator: &mut Self,
+        values: RVec<WrappedArray>,
+    ) -> RResult<(), RString>,
+
+    pub supports_retract_batch: bool,
+
+    /// Release the memory of the private data when it is no longer being used.
+    pub release: unsafe extern "C" fn(accumulator: &mut Self),
+
+    /// Internal data. This is only to be accessed by the provider of the 
accumulator.
+    /// A [`ForeignAccumulator`] should never attempt to access this data.
+    pub private_data: *mut c_void,
+}
+
+unsafe impl Send for FFI_Accumulator {}
+unsafe impl Sync for FFI_Accumulator {}
+
+pub struct AccumulatorPrivateData {
+    pub accumulator: Box<dyn Accumulator>,
+}
+
+impl FFI_Accumulator {
+    #[inline]
+    unsafe fn inner_mut(&mut self) -> &mut Box<dyn Accumulator> {
+        let private_data = self.private_data as *mut AccumulatorPrivateData;
+        &mut (*private_data).accumulator
+    }
+
+    #[inline]
+    unsafe fn inner(&self) -> &dyn Accumulator {
+        let private_data = self.private_data as *const AccumulatorPrivateData;
+        (*private_data).accumulator.deref()
+    }
+}
+
+unsafe extern "C" fn update_batch_fn_wrapper(
+    accumulator: &mut FFI_Accumulator,
+    values: RVec<WrappedArray>,
+) -> RResult<(), RString> {
+    let accumulator = accumulator.inner_mut();
+
+    let values_arrays = values
+        .into_iter()
+        .map(|v| v.try_into().map_err(DataFusionError::from))
+        .collect::<Result<Vec<ArrayRef>>>();
+    let values_arrays = rresult_return!(values_arrays);
+
+    rresult!(accumulator.update_batch(&values_arrays))
+}
+
+unsafe extern "C" fn evaluate_fn_wrapper(
+    accumulator: &mut FFI_Accumulator,
+) -> RResult<RVec<u8>, RString> {
+    let accumulator = accumulator.inner_mut();
+
+    let scalar_result = rresult_return!(accumulator.evaluate());
+    let proto_result: datafusion_proto::protobuf::ScalarValue =
+        rresult_return!((&scalar_result).try_into());
+
+    RResult::ROk(proto_result.encode_to_vec().into())
+}
+
+unsafe extern "C" fn size_fn_wrapper(accumulator: &FFI_Accumulator) -> usize {
+    accumulator.inner().size()
+}
+
+unsafe extern "C" fn state_fn_wrapper(
+    accumulator: &mut FFI_Accumulator,
+) -> RResult<RVec<RVec<u8>>, RString> {
+    let accumulator = accumulator.inner_mut();
+
+    let state = rresult_return!(accumulator.state());
+    let state = state
+        .into_iter()
+        .map(|state_val| {
+            datafusion_proto::protobuf::ScalarValue::try_from(&state_val)
+                .map_err(DataFusionError::from)
+                .map(|v| RVec::from(v.encode_to_vec()))
+        })
+        .collect::<Result<Vec<_>>>()
+        .map(|state_vec| state_vec.into());
+
+    rresult!(state)
+}
+
+unsafe extern "C" fn merge_batch_fn_wrapper(
+    accumulator: &mut FFI_Accumulator,
+    states: RVec<WrappedArray>,
+) -> RResult<(), RString> {
+    let accumulator = accumulator.inner_mut();
+
+    let states = rresult_return!(states
+        .into_iter()
+        .map(|state| ArrayRef::try_from(state).map_err(DataFusionError::from))
+        .collect::<Result<Vec<_>>>());
+
+    rresult!(accumulator.merge_batch(&states))
+}
+
+unsafe extern "C" fn retract_batch_fn_wrapper(
+    accumulator: &mut FFI_Accumulator,
+    values: RVec<WrappedArray>,
+) -> RResult<(), RString> {
+    let accumulator = accumulator.inner_mut();
+
+    let values_arrays = values
+        .into_iter()
+        .map(|v| v.try_into().map_err(DataFusionError::from))
+        .collect::<Result<Vec<ArrayRef>>>();
+    let values_arrays = rresult_return!(values_arrays);
+
+    rresult!(accumulator.retract_batch(&values_arrays))
+}
+
+unsafe extern "C" fn release_fn_wrapper(accumulator: &mut FFI_Accumulator) {
+    let private_data =
+        Box::from_raw(accumulator.private_data as *mut AccumulatorPrivateData);
+    drop(private_data);
+}
+
+impl From<Box<dyn Accumulator>> for FFI_Accumulator {
+    fn from(accumulator: Box<dyn Accumulator>) -> Self {
+        let supports_retract_batch = accumulator.supports_retract_batch();
+        let private_data = AccumulatorPrivateData { accumulator };
+
+        Self {
+            update_batch: update_batch_fn_wrapper,
+            evaluate: evaluate_fn_wrapper,
+            size: size_fn_wrapper,
+            state: state_fn_wrapper,
+            merge_batch: merge_batch_fn_wrapper,
+            retract_batch: retract_batch_fn_wrapper,
+            supports_retract_batch,
+            release: release_fn_wrapper,
+            private_data: Box::into_raw(Box::new(private_data)) as *mut c_void,
+        }
+    }
+}
+
+impl Drop for FFI_Accumulator {
+    fn drop(&mut self) {
+        unsafe { (self.release)(self) }
+    }
+}
+
+/// This struct is used to access an UDF provided by a foreign
+/// library across a FFI boundary.
+///
+/// The ForeignAccumulator is to be used by the caller of the UDF, so it has
+/// no knowledge or access to the private data. All interaction with the UDF
+/// must occur through the functions defined in FFI_Accumulator.
+#[derive(Debug)]
+pub struct ForeignAccumulator {
+    accumulator: FFI_Accumulator,
+}
+
+unsafe impl Send for ForeignAccumulator {}
+unsafe impl Sync for ForeignAccumulator {}
+
+impl From<FFI_Accumulator> for ForeignAccumulator {
+    fn from(accumulator: FFI_Accumulator) -> Self {
+        Self { accumulator }
+    }
+}
+
+impl Accumulator for ForeignAccumulator {
+    fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
+        unsafe {
+            let values = values
+                .iter()
+                .map(WrappedArray::try_from)
+                .collect::<std::result::Result<Vec<_>, ArrowError>>()?;
+            df_result!((self.accumulator.update_batch)(
+                &mut self.accumulator,
+                values.into()
+            ))
+        }
+    }
+
+    fn evaluate(&mut self) -> Result<ScalarValue> {
+        unsafe {
+            let scalar_bytes =
+                df_result!((self.accumulator.evaluate)(&mut 
self.accumulator))?;
+
+            let proto_scalar =
+                
datafusion_proto::protobuf::ScalarValue::decode(scalar_bytes.as_ref())
+                    .map_err(|e| DataFusionError::External(Box::new(e)))?;
+
+            ScalarValue::try_from(&proto_scalar).map_err(DataFusionError::from)
+        }
+    }
+
+    fn size(&self) -> usize {
+        unsafe { (self.accumulator.size)(&self.accumulator) }
+    }
+
+    fn state(&mut self) -> Result<Vec<ScalarValue>> {
+        unsafe {
+            let state_protos =
+                df_result!((self.accumulator.state)(&mut self.accumulator))?;
+
+            state_protos
+                .into_iter()
+                .map(|proto_bytes| {
+                    
datafusion_proto::protobuf::ScalarValue::decode(proto_bytes.as_ref())
+                        .map_err(|e| DataFusionError::External(Box::new(e)))
+                        .and_then(|proto_value| {
+                            ScalarValue::try_from(&proto_value)
+                                .map_err(DataFusionError::from)
+                        })
+                })
+                .collect::<Result<Vec<_>>>()
+        }
+    }
+
+    fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
+        unsafe {
+            let states = states
+                .iter()
+                .map(WrappedArray::try_from)
+                .collect::<std::result::Result<Vec<_>, ArrowError>>()?;
+            df_result!((self.accumulator.merge_batch)(
+                &mut self.accumulator,
+                states.into()
+            ))
+        }
+    }
+
+    fn retract_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
+        unsafe {
+            let values = values
+                .iter()
+                .map(WrappedArray::try_from)
+                .collect::<std::result::Result<Vec<_>, ArrowError>>()?;
+            df_result!((self.accumulator.retract_batch)(
+                &mut self.accumulator,
+                values.into()
+            ))
+        }
+    }
+
+    fn supports_retract_batch(&self) -> bool {
+        self.accumulator.supports_retract_batch
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use arrow::array::{make_array, Array};
+    use datafusion::{
+        common::create_array, error::Result,
+        functions_aggregate::average::AvgAccumulator, 
logical_expr::Accumulator,
+        scalar::ScalarValue,
+    };
+
+    use super::{FFI_Accumulator, ForeignAccumulator};
+
+    #[test]
+    fn test_foreign_avg_accumulator() -> Result<()> {
+        let original_accum = AvgAccumulator::default();
+        let original_size = original_accum.size();
+        let original_supports_retract = 
original_accum.supports_retract_batch();
+
+        let boxed_accum: Box<dyn Accumulator> = Box::new(original_accum);
+        let ffi_accum: FFI_Accumulator = boxed_accum.into();
+        let mut foreign_accum: ForeignAccumulator = ffi_accum.into();
+
+        // Send in an array to average. There are 5 values and it should 
average to 30.0
+        let values = create_array!(Float64, vec![10., 20., 30., 40., 50.]);
+        foreign_accum.update_batch(&[values])?;
+
+        let avg = foreign_accum.evaluate()?;
+        assert_eq!(avg, ScalarValue::Float64(Some(30.0)));
+
+        let state = foreign_accum.state()?;
+        assert_eq!(state.len(), 2);
+        assert_eq!(state[0], ScalarValue::UInt64(Some(5)));
+        assert_eq!(state[1], ScalarValue::Float64(Some(150.0)));
+
+        // To verify merging batches works, create a second state to add in
+        // This should cause our average to go down to 25.0
+        let second_states = vec![
+            make_array(create_array!(UInt64, vec![1]).to_data()),
+            make_array(create_array!(Float64, vec![0.0]).to_data()),
+        ];
+
+        foreign_accum.merge_batch(&second_states)?;
+        let avg = foreign_accum.evaluate()?;
+        assert_eq!(avg, ScalarValue::Float64(Some(25.0)));
+
+        // If we remove a batch that is equivalent to the state we added
+        // we should go back to our original value of 30.0
+        let values = create_array!(Float64, vec![0.0]);
+        foreign_accum.retract_batch(&[values])?;
+        let avg = foreign_accum.evaluate()?;
+        assert_eq!(avg, ScalarValue::Float64(Some(30.0)));
+
+        assert_eq!(original_size, foreign_accum.size());
+        assert_eq!(
+            original_supports_retract,
+            foreign_accum.supports_retract_batch()
+        );
+
+        Ok(())
+    }
+}
diff --git a/datafusion/ffi/src/udaf/accumulator_args.rs 
b/datafusion/ffi/src/udaf/accumulator_args.rs
new file mode 100644
index 0000000000..699af1d5c5
--- /dev/null
+++ b/datafusion/ffi/src/udaf/accumulator_args.rs
@@ -0,0 +1,198 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use crate::arrow_wrappers::WrappedSchema;
+use abi_stable::{
+    std_types::{RString, RVec},
+    StableAbi,
+};
+use arrow::{datatypes::Schema, ffi::FFI_ArrowSchema};
+use arrow_schema::FieldRef;
+use datafusion::{
+    error::DataFusionError, logical_expr::function::AccumulatorArgs,
+    physical_expr::LexOrdering, physical_plan::PhysicalExpr, 
prelude::SessionContext,
+};
+use datafusion_proto::{
+    physical_plan::{
+        from_proto::{parse_physical_exprs, parse_physical_sort_exprs},
+        to_proto::{serialize_physical_exprs, serialize_physical_sort_exprs},
+        DefaultPhysicalExtensionCodec,
+    },
+    protobuf::PhysicalAggregateExprNode,
+};
+use prost::Message;
+
+/// A stable struct for sharing [`AccumulatorArgs`] across FFI boundaries.
+/// For an explanation of each field, see the corresponding field
+/// defined in [`AccumulatorArgs`].
+#[repr(C)]
+#[derive(Debug, StableAbi)]
+#[allow(non_camel_case_types)]
+pub struct FFI_AccumulatorArgs {
+    return_field: WrappedSchema,
+    schema: WrappedSchema,
+    is_reversed: bool,
+    name: RString,
+    physical_expr_def: RVec<u8>,
+}
+
+impl TryFrom<AccumulatorArgs<'_>> for FFI_AccumulatorArgs {
+    type Error = DataFusionError;
+
+    fn try_from(args: AccumulatorArgs) -> Result<Self, Self::Error> {
+        let return_field =
+            
WrappedSchema(FFI_ArrowSchema::try_from(args.return_field.as_ref())?);
+        let schema = WrappedSchema(FFI_ArrowSchema::try_from(args.schema)?);
+
+        let codec = DefaultPhysicalExtensionCodec {};
+        let ordering_req =
+            serialize_physical_sort_exprs(args.ordering_req.to_owned(), 
&codec)?;
+
+        let expr = serialize_physical_exprs(args.exprs, &codec)?;
+
+        let physical_expr_def = PhysicalAggregateExprNode {
+            expr,
+            ordering_req,
+            distinct: args.is_distinct,
+            ignore_nulls: args.ignore_nulls,
+            fun_definition: None,
+            aggregate_function: None,
+        };
+        let physical_expr_def = physical_expr_def.encode_to_vec().into();
+
+        Ok(Self {
+            return_field,
+            schema,
+            is_reversed: args.is_reversed,
+            name: args.name.into(),
+            physical_expr_def,
+        })
+    }
+}
+
+/// This struct mirrors AccumulatorArgs except that it contains owned data.
+/// It is necessary to create this struct so that we can parse the protobuf
+/// data across the FFI boundary and turn it into owned data that
+/// AccumulatorArgs can then reference.
+pub struct ForeignAccumulatorArgs {
+    pub return_field: FieldRef,
+    pub schema: Schema,
+    pub ignore_nulls: bool,
+    pub ordering_req: LexOrdering,
+    pub is_reversed: bool,
+    pub name: String,
+    pub is_distinct: bool,
+    pub exprs: Vec<Arc<dyn PhysicalExpr>>,
+}
+
+impl TryFrom<FFI_AccumulatorArgs> for ForeignAccumulatorArgs {
+    type Error = DataFusionError;
+
+    fn try_from(value: FFI_AccumulatorArgs) -> Result<Self, Self::Error> {
+        let proto_def =
+            PhysicalAggregateExprNode::decode(value.physical_expr_def.as_ref())
+                .map_err(|e| DataFusionError::Execution(e.to_string()))?;
+
+        let return_field = Arc::new((&value.return_field.0).try_into()?);
+        let schema = Schema::try_from(&value.schema.0)?;
+
+        let default_ctx = SessionContext::new();
+        let codex = DefaultPhysicalExtensionCodec {};
+
+        // let proto_ordering_req =
+        //     
rresult_return!(PhysicalSortExprNodeCollection::decode(ordering_req.as_ref()));
+        let ordering_req = parse_physical_sort_exprs(
+            &proto_def.ordering_req,
+            &default_ctx,
+            &schema,
+            &codex,
+        )?;
+
+        let exprs = parse_physical_exprs(&proto_def.expr, &default_ctx, 
&schema, &codex)?;
+
+        Ok(Self {
+            return_field,
+            schema,
+            ignore_nulls: proto_def.ignore_nulls,
+            ordering_req,
+            is_reversed: value.is_reversed,
+            name: value.name.to_string(),
+            is_distinct: proto_def.distinct,
+            exprs,
+        })
+    }
+}
+
+impl<'a> From<&'a ForeignAccumulatorArgs> for AccumulatorArgs<'a> {
+    fn from(value: &'a ForeignAccumulatorArgs) -> Self {
+        Self {
+            return_field: Arc::clone(&value.return_field),
+            schema: &value.schema,
+            ignore_nulls: value.ignore_nulls,
+            ordering_req: &value.ordering_req,
+            is_reversed: value.is_reversed,
+            name: value.name.as_str(),
+            is_distinct: value.is_distinct,
+            exprs: &value.exprs,
+        }
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::{FFI_AccumulatorArgs, ForeignAccumulatorArgs};
+    use arrow::datatypes::{DataType, Field, Schema};
+    use datafusion::{
+        error::Result,
+        logical_expr::function::AccumulatorArgs,
+        physical_expr::{LexOrdering, PhysicalSortExpr},
+        physical_plan::expressions::col,
+    };
+
+    #[test]
+    fn test_round_trip_accumulator_args() -> Result<()> {
+        let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
+        let orig_args = AccumulatorArgs {
+            return_field: Field::new("f", DataType::Float64, true).into(),
+            schema: &schema,
+            ignore_nulls: false,
+            ordering_req: &LexOrdering::new(vec![PhysicalSortExpr {
+                expr: col("a", &schema)?,
+                options: Default::default(),
+            }]),
+            is_reversed: false,
+            name: "round_trip",
+            is_distinct: true,
+            exprs: &[col("a", &schema)?],
+        };
+        let orig_str = format!("{orig_args:?}");
+
+        let ffi_args: FFI_AccumulatorArgs = orig_args.try_into()?;
+        let foreign_args: ForeignAccumulatorArgs = ffi_args.try_into()?;
+        let round_trip_args: AccumulatorArgs = (&foreign_args).into();
+
+        let round_trip_str = format!("{round_trip_args:?}");
+
+        // Since AccumulatorArgs doesn't implement Eq, simply compare
+        // the debug strings.
+        assert_eq!(orig_str, round_trip_str);
+
+        Ok(())
+    }
+}
diff --git a/datafusion/ffi/src/udaf/groups_accumulator.rs 
b/datafusion/ffi/src/udaf/groups_accumulator.rs
new file mode 100644
index 0000000000..58a18c69db
--- /dev/null
+++ b/datafusion/ffi/src/udaf/groups_accumulator.rs
@@ -0,0 +1,513 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{ffi::c_void, ops::Deref, sync::Arc};
+
+use crate::{
+    arrow_wrappers::{WrappedArray, WrappedSchema},
+    df_result, rresult, rresult_return,
+};
+use abi_stable::{
+    std_types::{ROption, RResult, RString, RVec},
+    StableAbi,
+};
+use arrow::{
+    array::{Array, ArrayRef, BooleanArray},
+    error::ArrowError,
+    ffi::to_ffi,
+};
+use datafusion::{
+    error::{DataFusionError, Result},
+    logical_expr::{EmitTo, GroupsAccumulator},
+};
+
+/// A stable struct for sharing [`GroupsAccumulator`] across FFI boundaries.
+/// For an explanation of each field, see the corresponding function
+/// defined in [`GroupsAccumulator`].
+#[repr(C)]
+#[derive(Debug, StableAbi)]
+#[allow(non_camel_case_types)]
+pub struct FFI_GroupsAccumulator {
+    pub update_batch: unsafe extern "C" fn(
+        accumulator: &mut Self,
+        values: RVec<WrappedArray>,
+        group_indices: RVec<usize>,
+        opt_filter: ROption<WrappedArray>,
+        total_num_groups: usize,
+    ) -> RResult<(), RString>,
+
+    // Evaluate and return a ScalarValues as protobuf bytes
+    pub evaluate: unsafe extern "C" fn(
+        accumulator: &mut Self,
+        emit_to: FFI_EmitTo,
+    ) -> RResult<WrappedArray, RString>,
+
+    pub size: unsafe extern "C" fn(accumulator: &Self) -> usize,
+
+    pub state: unsafe extern "C" fn(
+        accumulator: &mut Self,
+        emit_to: FFI_EmitTo,
+    ) -> RResult<RVec<WrappedArray>, RString>,
+
+    pub merge_batch: unsafe extern "C" fn(
+        accumulator: &mut Self,
+        values: RVec<WrappedArray>,
+        group_indices: RVec<usize>,
+        opt_filter: ROption<WrappedArray>,
+        total_num_groups: usize,
+    ) -> RResult<(), RString>,
+
+    pub convert_to_state: unsafe extern "C" fn(
+        accumulator: &Self,
+        values: RVec<WrappedArray>,
+        opt_filter: ROption<WrappedArray>,
+    )
+        -> RResult<RVec<WrappedArray>, RString>,
+
+    pub supports_convert_to_state: bool,
+
+    /// Release the memory of the private data when it is no longer being used.
+    pub release: unsafe extern "C" fn(accumulator: &mut Self),
+
+    /// Internal data. This is only to be accessed by the provider of the 
accumulator.
+    /// A [`ForeignGroupsAccumulator`] should never attempt to access this 
data.
+    pub private_data: *mut c_void,
+}
+
+unsafe impl Send for FFI_GroupsAccumulator {}
+unsafe impl Sync for FFI_GroupsAccumulator {}
+
+pub struct GroupsAccumulatorPrivateData {
+    pub accumulator: Box<dyn GroupsAccumulator>,
+}
+
+impl FFI_GroupsAccumulator {
+    #[inline]
+    unsafe fn inner_mut(&mut self) -> &mut Box<dyn GroupsAccumulator> {
+        let private_data = self.private_data as *mut 
GroupsAccumulatorPrivateData;
+        &mut (*private_data).accumulator
+    }
+
+    #[inline]
+    unsafe fn inner(&self) -> &dyn GroupsAccumulator {
+        let private_data = self.private_data as *const 
GroupsAccumulatorPrivateData;
+        (*private_data).accumulator.deref()
+    }
+}
+
+fn process_values(values: RVec<WrappedArray>) -> Result<Vec<Arc<dyn Array>>> {
+    values
+        .into_iter()
+        .map(|v| v.try_into().map_err(DataFusionError::from))
+        .collect::<Result<Vec<ArrayRef>>>()
+}
+
+/// Convert C-typed opt_filter into the internal type.
+fn process_opt_filter(opt_filter: ROption<WrappedArray>) -> 
Result<Option<BooleanArray>> {
+    opt_filter
+        .into_option()
+        .map(|filter| {
+            ArrayRef::try_from(filter)
+                .map_err(DataFusionError::from)
+                .map(|arr| BooleanArray::from(arr.into_data()))
+        })
+        .transpose()
+}
+
+unsafe extern "C" fn update_batch_fn_wrapper(
+    accumulator: &mut FFI_GroupsAccumulator,
+    values: RVec<WrappedArray>,
+    group_indices: RVec<usize>,
+    opt_filter: ROption<WrappedArray>,
+    total_num_groups: usize,
+) -> RResult<(), RString> {
+    let accumulator = accumulator.inner_mut();
+    let values = rresult_return!(process_values(values));
+    let group_indices: Vec<usize> = group_indices.into_iter().collect();
+    let opt_filter = rresult_return!(process_opt_filter(opt_filter));
+
+    rresult!(accumulator.update_batch(
+        &values,
+        &group_indices,
+        opt_filter.as_ref(),
+        total_num_groups
+    ))
+}
+
+unsafe extern "C" fn evaluate_fn_wrapper(
+    accumulator: &mut FFI_GroupsAccumulator,
+    emit_to: FFI_EmitTo,
+) -> RResult<WrappedArray, RString> {
+    let accumulator = accumulator.inner_mut();
+
+    let result = rresult_return!(accumulator.evaluate(emit_to.into()));
+
+    rresult!(WrappedArray::try_from(&result))
+}
+
+unsafe extern "C" fn size_fn_wrapper(accumulator: &FFI_GroupsAccumulator) -> 
usize {
+    let accumulator = accumulator.inner();
+    accumulator.size()
+}
+
+unsafe extern "C" fn state_fn_wrapper(
+    accumulator: &mut FFI_GroupsAccumulator,
+    emit_to: FFI_EmitTo,
+) -> RResult<RVec<WrappedArray>, RString> {
+    let accumulator = accumulator.inner_mut();
+
+    let state = rresult_return!(accumulator.state(emit_to.into()));
+    rresult!(state
+        .into_iter()
+        .map(|arr| WrappedArray::try_from(&arr).map_err(DataFusionError::from))
+        .collect::<Result<RVec<_>>>())
+}
+
+unsafe extern "C" fn merge_batch_fn_wrapper(
+    accumulator: &mut FFI_GroupsAccumulator,
+    values: RVec<WrappedArray>,
+    group_indices: RVec<usize>,
+    opt_filter: ROption<WrappedArray>,
+    total_num_groups: usize,
+) -> RResult<(), RString> {
+    let accumulator = accumulator.inner_mut();
+    let values = rresult_return!(process_values(values));
+    let group_indices: Vec<usize> = group_indices.into_iter().collect();
+    let opt_filter = rresult_return!(process_opt_filter(opt_filter));
+
+    rresult!(accumulator.merge_batch(
+        &values,
+        &group_indices,
+        opt_filter.as_ref(),
+        total_num_groups
+    ))
+}
+
+unsafe extern "C" fn convert_to_state_fn_wrapper(
+    accumulator: &FFI_GroupsAccumulator,
+    values: RVec<WrappedArray>,
+    opt_filter: ROption<WrappedArray>,
+) -> RResult<RVec<WrappedArray>, RString> {
+    let accumulator = accumulator.inner();
+    let values = rresult_return!(process_values(values));
+    let opt_filter = rresult_return!(process_opt_filter(opt_filter));
+    let state =
+        rresult_return!(accumulator.convert_to_state(&values, 
opt_filter.as_ref()));
+
+    rresult!(state
+        .iter()
+        .map(|arr| WrappedArray::try_from(arr).map_err(DataFusionError::from))
+        .collect::<Result<RVec<_>>>())
+}
+
+unsafe extern "C" fn release_fn_wrapper(accumulator: &mut 
FFI_GroupsAccumulator) {
+    let private_data =
+        Box::from_raw(accumulator.private_data as *mut 
GroupsAccumulatorPrivateData);
+    drop(private_data);
+}
+
+impl From<Box<dyn GroupsAccumulator>> for FFI_GroupsAccumulator {
+    fn from(accumulator: Box<dyn GroupsAccumulator>) -> Self {
+        let supports_convert_to_state = 
accumulator.supports_convert_to_state();
+        let private_data = GroupsAccumulatorPrivateData { accumulator };
+
+        Self {
+            update_batch: update_batch_fn_wrapper,
+            evaluate: evaluate_fn_wrapper,
+            size: size_fn_wrapper,
+            state: state_fn_wrapper,
+            merge_batch: merge_batch_fn_wrapper,
+            convert_to_state: convert_to_state_fn_wrapper,
+            supports_convert_to_state,
+
+            release: release_fn_wrapper,
+            private_data: Box::into_raw(Box::new(private_data)) as *mut c_void,
+        }
+    }
+}
+
+impl Drop for FFI_GroupsAccumulator {
+    fn drop(&mut self) {
+        unsafe { (self.release)(self) }
+    }
+}
+
+/// This struct is used to access an UDF provided by a foreign
+/// library across a FFI boundary.
+///
+/// The ForeignGroupsAccumulator is to be used by the caller of the UDF, so it 
has
+/// no knowledge or access to the private data. All interaction with the UDF
+/// must occur through the functions defined in FFI_GroupsAccumulator.
+#[derive(Debug)]
+pub struct ForeignGroupsAccumulator {
+    accumulator: FFI_GroupsAccumulator,
+}
+
+unsafe impl Send for ForeignGroupsAccumulator {}
+unsafe impl Sync for ForeignGroupsAccumulator {}
+
+impl From<FFI_GroupsAccumulator> for ForeignGroupsAccumulator {
+    fn from(accumulator: FFI_GroupsAccumulator) -> Self {
+        Self { accumulator }
+    }
+}
+
+impl GroupsAccumulator for ForeignGroupsAccumulator {
+    fn update_batch(
+        &mut self,
+        values: &[ArrayRef],
+        group_indices: &[usize],
+        opt_filter: Option<&BooleanArray>,
+        total_num_groups: usize,
+    ) -> Result<()> {
+        unsafe {
+            let values = values
+                .iter()
+                .map(WrappedArray::try_from)
+                .collect::<std::result::Result<Vec<_>, ArrowError>>()?;
+            let group_indices = group_indices.iter().cloned().collect();
+            let opt_filter = opt_filter
+                .map(|bool_array| to_ffi(&bool_array.to_data()))
+                .transpose()?
+                .map(|(array, schema)| WrappedArray {
+                    array,
+                    schema: WrappedSchema(schema),
+                })
+                .into();
+
+            df_result!((self.accumulator.update_batch)(
+                &mut self.accumulator,
+                values.into(),
+                group_indices,
+                opt_filter,
+                total_num_groups
+            ))
+        }
+    }
+
+    fn size(&self) -> usize {
+        unsafe { (self.accumulator.size)(&self.accumulator) }
+    }
+
+    fn evaluate(&mut self, emit_to: EmitTo) -> Result<ArrayRef> {
+        unsafe {
+            let return_array = df_result!((self.accumulator.evaluate)(
+                &mut self.accumulator,
+                emit_to.into()
+            ))?;
+
+            return_array.try_into().map_err(DataFusionError::from)
+        }
+    }
+
+    fn state(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>> {
+        unsafe {
+            let returned_arrays = df_result!((self.accumulator.state)(
+                &mut self.accumulator,
+                emit_to.into()
+            ))?;
+
+            returned_arrays
+                .into_iter()
+                .map(|wrapped_array| {
+                    wrapped_array.try_into().map_err(DataFusionError::from)
+                })
+                .collect::<Result<Vec<_>>>()
+        }
+    }
+
+    fn merge_batch(
+        &mut self,
+        values: &[ArrayRef],
+        group_indices: &[usize],
+        opt_filter: Option<&BooleanArray>,
+        total_num_groups: usize,
+    ) -> Result<()> {
+        unsafe {
+            let values = values
+                .iter()
+                .map(WrappedArray::try_from)
+                .collect::<std::result::Result<Vec<_>, ArrowError>>()?;
+            let group_indices = group_indices.iter().cloned().collect();
+            let opt_filter = opt_filter
+                .map(|bool_array| to_ffi(&bool_array.to_data()))
+                .transpose()?
+                .map(|(array, schema)| WrappedArray {
+                    array,
+                    schema: WrappedSchema(schema),
+                })
+                .into();
+
+            df_result!((self.accumulator.merge_batch)(
+                &mut self.accumulator,
+                values.into(),
+                group_indices,
+                opt_filter,
+                total_num_groups
+            ))
+        }
+    }
+
+    fn convert_to_state(
+        &self,
+        values: &[ArrayRef],
+        opt_filter: Option<&BooleanArray>,
+    ) -> Result<Vec<ArrayRef>> {
+        unsafe {
+            let values = values
+                .iter()
+                .map(WrappedArray::try_from)
+                .collect::<std::result::Result<RVec<_>, ArrowError>>()?;
+
+            let opt_filter = opt_filter
+                .map(|bool_array| to_ffi(&bool_array.to_data()))
+                .transpose()?
+                .map(|(array, schema)| WrappedArray {
+                    array,
+                    schema: WrappedSchema(schema),
+                })
+                .into();
+
+            let returned_array = 
df_result!((self.accumulator.convert_to_state)(
+                &self.accumulator,
+                values,
+                opt_filter
+            ))?;
+
+            returned_array
+                .into_iter()
+                .map(|arr| arr.try_into().map_err(DataFusionError::from))
+                .collect()
+        }
+    }
+
+    fn supports_convert_to_state(&self) -> bool {
+        self.accumulator.supports_convert_to_state
+    }
+}
+
+#[repr(C)]
+#[derive(Debug, StableAbi)]
+#[allow(non_camel_case_types)]
+pub enum FFI_EmitTo {
+    All,
+    First(usize),
+}
+
+impl From<EmitTo> for FFI_EmitTo {
+    fn from(value: EmitTo) -> Self {
+        match value {
+            EmitTo::All => Self::All,
+            EmitTo::First(v) => Self::First(v),
+        }
+    }
+}
+
+impl From<FFI_EmitTo> for EmitTo {
+    fn from(value: FFI_EmitTo) -> Self {
+        match value {
+            FFI_EmitTo::All => Self::All,
+            FFI_EmitTo::First(v) => Self::First(v),
+        }
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use arrow::array::{make_array, Array, BooleanArray};
+    use datafusion::{
+        common::create_array,
+        error::Result,
+        logical_expr::{EmitTo, GroupsAccumulator},
+    };
+    use 
datafusion_functions_aggregate_common::aggregate::groups_accumulator::bool_op::BooleanGroupsAccumulator;
+
+    use super::{FFI_EmitTo, FFI_GroupsAccumulator, ForeignGroupsAccumulator};
+
+    #[test]
+    fn test_foreign_avg_accumulator() -> Result<()> {
+        let boxed_accum: Box<dyn GroupsAccumulator> =
+            Box::new(BooleanGroupsAccumulator::new(|a, b| a && b, true));
+        let ffi_accum: FFI_GroupsAccumulator = boxed_accum.into();
+        let mut foreign_accum: ForeignGroupsAccumulator = ffi_accum.into();
+
+        // Send in an array to evaluate. We want a mean of 30 and standard 
deviation of 4.
+        let values = create_array!(Boolean, vec![true, true, true, false, 
true, true]);
+        let opt_filter =
+            create_array!(Boolean, vec![true, true, true, true, false, false]);
+        foreign_accum.update_batch(
+            &[values],
+            &[0, 0, 1, 1, 2, 2],
+            Some(opt_filter.as_ref()),
+            3,
+        )?;
+
+        let groups_bool = foreign_accum.evaluate(EmitTo::All)?;
+        let groups_bool = 
groups_bool.as_any().downcast_ref::<BooleanArray>().unwrap();
+
+        assert_eq!(
+            groups_bool,
+            create_array!(Boolean, vec![Some(true), Some(false), 
None]).as_ref()
+        );
+
+        let state = foreign_accum.state(EmitTo::All)?;
+        assert_eq!(state.len(), 1);
+
+        // To verify merging batches works, create a second state to add in
+        // This should cause our average to go down to 25.0
+        let second_states =
+            vec![make_array(create_array!(Boolean, vec![false]).to_data())];
+
+        let opt_filter = create_array!(Boolean, vec![true]);
+        foreign_accum.merge_batch(&second_states, &[0], 
Some(opt_filter.as_ref()), 1)?;
+        let groups_bool = foreign_accum.evaluate(EmitTo::All)?;
+        assert_eq!(groups_bool.len(), 1);
+        assert_eq!(
+            groups_bool.as_ref(),
+            make_array(create_array!(Boolean, vec![false]).to_data()).as_ref()
+        );
+
+        let values = create_array!(Boolean, vec![false]);
+        let opt_filter = create_array!(Boolean, vec![true]);
+        let groups_bool =
+            foreign_accum.convert_to_state(&[values], 
Some(opt_filter.as_ref()))?;
+
+        assert_eq!(
+            groups_bool[0].as_ref(),
+            make_array(create_array!(Boolean, vec![false]).to_data()).as_ref()
+        );
+
+        Ok(())
+    }
+
+    fn test_emit_to_round_trip(value: EmitTo) -> Result<()> {
+        let ffi_value: FFI_EmitTo = value.into();
+        let round_trip_value: EmitTo = ffi_value.into();
+
+        assert_eq!(value, round_trip_value);
+        Ok(())
+    }
+
+    /// This test ensures all enum values are properly translated
+    #[test]
+    fn test_all_emit_to_round_trip() -> Result<()> {
+        test_emit_to_round_trip(EmitTo::All)?;
+        test_emit_to_round_trip(EmitTo::First(10))?;
+
+        Ok(())
+    }
+}
diff --git a/datafusion/ffi/src/udaf/mod.rs b/datafusion/ffi/src/udaf/mod.rs
new file mode 100644
index 0000000000..2529ed7a06
--- /dev/null
+++ b/datafusion/ffi/src/udaf/mod.rs
@@ -0,0 +1,733 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{ffi::c_void, sync::Arc};
+
+use abi_stable::{
+    std_types::{ROption, RResult, RStr, RString, RVec},
+    StableAbi,
+};
+use accumulator::{FFI_Accumulator, ForeignAccumulator};
+use accumulator_args::{FFI_AccumulatorArgs, ForeignAccumulatorArgs};
+use arrow::datatypes::{DataType, Field};
+use arrow::ffi::FFI_ArrowSchema;
+use arrow_schema::FieldRef;
+use datafusion::{
+    error::DataFusionError,
+    logical_expr::{
+        function::{AccumulatorArgs, AggregateFunctionSimplification, 
StateFieldsArgs},
+        type_coercion::functions::fields_with_aggregate_udf,
+        utils::AggregateOrderSensitivity,
+        Accumulator, GroupsAccumulator,
+    },
+};
+use datafusion::{
+    error::Result,
+    logical_expr::{AggregateUDF, AggregateUDFImpl, Signature},
+};
+use datafusion_proto_common::from_proto::parse_proto_fields_to_fields;
+use groups_accumulator::{FFI_GroupsAccumulator, ForeignGroupsAccumulator};
+
+use crate::util::{rvec_wrapped_to_vec_fieldref, vec_fieldref_to_rvec_wrapped};
+use crate::{
+    arrow_wrappers::WrappedSchema,
+    df_result, rresult, rresult_return,
+    util::{rvec_wrapped_to_vec_datatype, vec_datatype_to_rvec_wrapped},
+    volatility::FFI_Volatility,
+};
+use prost::{DecodeError, Message};
+
+mod accumulator;
+mod accumulator_args;
+mod groups_accumulator;
+
+/// A stable struct for sharing a [`AggregateUDF`] across FFI boundaries.
+#[repr(C)]
+#[derive(Debug, StableAbi)]
+#[allow(non_camel_case_types)]
+pub struct FFI_AggregateUDF {
+    /// FFI equivalent to the `name` of a [`AggregateUDF`]
+    pub name: RString,
+
+    /// FFI equivalent to the `aliases` of a [`AggregateUDF`]
+    pub aliases: RVec<RString>,
+
+    /// FFI equivalent to the `volatility` of a [`AggregateUDF`]
+    pub volatility: FFI_Volatility,
+
+    /// Determines the return type of the underlying [`AggregateUDF`] based on 
the
+    /// argument types.
+    pub return_type: unsafe extern "C" fn(
+        udaf: &Self,
+        arg_types: RVec<WrappedSchema>,
+    ) -> RResult<WrappedSchema, RString>,
+
+    /// FFI equivalent to the `is_nullable` of a [`AggregateUDF`]
+    pub is_nullable: bool,
+
+    /// FFI equivalent to [`AggregateUDF::groups_accumulator_supported`]
+    pub groups_accumulator_supported:
+        unsafe extern "C" fn(udaf: &FFI_AggregateUDF, args: 
FFI_AccumulatorArgs) -> bool,
+
+    /// FFI equivalent to [`AggregateUDF::accumulator`]
+    pub accumulator: unsafe extern "C" fn(
+        udaf: &FFI_AggregateUDF,
+        args: FFI_AccumulatorArgs,
+    ) -> RResult<FFI_Accumulator, RString>,
+
+    /// FFI equivalent to [`AggregateUDF::create_sliding_accumulator`]
+    pub create_sliding_accumulator:
+        unsafe extern "C" fn(
+            udaf: &FFI_AggregateUDF,
+            args: FFI_AccumulatorArgs,
+        ) -> RResult<FFI_Accumulator, RString>,
+
+    /// FFI equivalent to [`AggregateUDF::state_fields`]
+    #[allow(clippy::type_complexity)]
+    pub state_fields: unsafe extern "C" fn(
+        udaf: &FFI_AggregateUDF,
+        name: &RStr,
+        input_fields: RVec<WrappedSchema>,
+        return_field: WrappedSchema,
+        ordering_fields: RVec<RVec<u8>>,
+        is_distinct: bool,
+    ) -> RResult<RVec<RVec<u8>>, RString>,
+
+    /// FFI equivalent to [`AggregateUDF::create_groups_accumulator`]
+    pub create_groups_accumulator:
+        unsafe extern "C" fn(
+            udaf: &FFI_AggregateUDF,
+            args: FFI_AccumulatorArgs,
+        ) -> RResult<FFI_GroupsAccumulator, RString>,
+
+    /// FFI equivalent to [`AggregateUDF::with_beneficial_ordering`]
+    pub with_beneficial_ordering:
+        unsafe extern "C" fn(
+            udaf: &FFI_AggregateUDF,
+            beneficial_ordering: bool,
+        ) -> RResult<ROption<FFI_AggregateUDF>, RString>,
+
+    /// FFI equivalent to [`AggregateUDF::order_sensitivity`]
+    pub order_sensitivity:
+        unsafe extern "C" fn(udaf: &FFI_AggregateUDF) -> 
FFI_AggregateOrderSensitivity,
+
+    /// Performs type coersion. To simply this interface, all UDFs are treated 
as having
+    /// user defined signatures, which will in turn call coerce_types to be 
called. This
+    /// call should be transparent to most users as the internal function 
performs the
+    /// appropriate calls on the underlying [`AggregateUDF`]
+    pub coerce_types: unsafe extern "C" fn(
+        udf: &Self,
+        arg_types: RVec<WrappedSchema>,
+    ) -> RResult<RVec<WrappedSchema>, RString>,
+
+    /// Used to create a clone on the provider of the udaf. This should
+    /// only need to be called by the receiver of the udaf.
+    pub clone: unsafe extern "C" fn(udaf: &Self) -> Self,
+
+    /// Release the memory of the private data when it is no longer being used.
+    pub release: unsafe extern "C" fn(udaf: &mut Self),
+
+    /// Internal data. This is only to be accessed by the provider of the udaf.
+    /// A [`ForeignAggregateUDF`] should never attempt to access this data.
+    pub private_data: *mut c_void,
+}
+
+unsafe impl Send for FFI_AggregateUDF {}
+unsafe impl Sync for FFI_AggregateUDF {}
+
+pub struct AggregateUDFPrivateData {
+    pub udaf: Arc<AggregateUDF>,
+}
+
+impl FFI_AggregateUDF {
+    unsafe fn inner(&self) -> &Arc<AggregateUDF> {
+        let private_data = self.private_data as *const AggregateUDFPrivateData;
+        &(*private_data).udaf
+    }
+}
+
+unsafe extern "C" fn return_type_fn_wrapper(
+    udaf: &FFI_AggregateUDF,
+    arg_types: RVec<WrappedSchema>,
+) -> RResult<WrappedSchema, RString> {
+    let udaf = udaf.inner();
+
+    let arg_types = rresult_return!(rvec_wrapped_to_vec_datatype(&arg_types));
+
+    let return_type = udaf
+        .return_type(&arg_types)
+        .and_then(|v| 
FFI_ArrowSchema::try_from(v).map_err(DataFusionError::from))
+        .map(WrappedSchema);
+
+    rresult!(return_type)
+}
+
+unsafe extern "C" fn accumulator_fn_wrapper(
+    udaf: &FFI_AggregateUDF,
+    args: FFI_AccumulatorArgs,
+) -> RResult<FFI_Accumulator, RString> {
+    let udaf = udaf.inner();
+
+    let accumulator_args = 
&rresult_return!(ForeignAccumulatorArgs::try_from(args));
+
+    rresult!(udaf
+        .accumulator(accumulator_args.into())
+        .map(FFI_Accumulator::from))
+}
+
+unsafe extern "C" fn create_sliding_accumulator_fn_wrapper(
+    udaf: &FFI_AggregateUDF,
+    args: FFI_AccumulatorArgs,
+) -> RResult<FFI_Accumulator, RString> {
+    let udaf = udaf.inner();
+
+    let accumulator_args = 
&rresult_return!(ForeignAccumulatorArgs::try_from(args));
+
+    rresult!(udaf
+        .create_sliding_accumulator(accumulator_args.into())
+        .map(FFI_Accumulator::from))
+}
+
+unsafe extern "C" fn create_groups_accumulator_fn_wrapper(
+    udaf: &FFI_AggregateUDF,
+    args: FFI_AccumulatorArgs,
+) -> RResult<FFI_GroupsAccumulator, RString> {
+    let udaf = udaf.inner();
+
+    let accumulator_args = 
&rresult_return!(ForeignAccumulatorArgs::try_from(args));
+
+    rresult!(udaf
+        .create_groups_accumulator(accumulator_args.into())
+        .map(FFI_GroupsAccumulator::from))
+}
+
+unsafe extern "C" fn groups_accumulator_supported_fn_wrapper(
+    udaf: &FFI_AggregateUDF,
+    args: FFI_AccumulatorArgs,
+) -> bool {
+    let udaf = udaf.inner();
+
+    ForeignAccumulatorArgs::try_from(args)
+        .map(|a| udaf.groups_accumulator_supported((&a).into()))
+        .unwrap_or_else(|e| {
+            log::warn!("Unable to parse accumulator args. {e}");
+            false
+        })
+}
+
+unsafe extern "C" fn with_beneficial_ordering_fn_wrapper(
+    udaf: &FFI_AggregateUDF,
+    beneficial_ordering: bool,
+) -> RResult<ROption<FFI_AggregateUDF>, RString> {
+    let udaf = udaf.inner().as_ref().clone();
+
+    let result = 
rresult_return!(udaf.with_beneficial_ordering(beneficial_ordering));
+    let result = rresult_return!(result
+        .map(|func| func.with_beneficial_ordering(beneficial_ordering))
+        .transpose())
+    .flatten()
+    .map(|func| FFI_AggregateUDF::from(Arc::new(func)));
+
+    RResult::ROk(result.into())
+}
+
+unsafe extern "C" fn state_fields_fn_wrapper(
+    udaf: &FFI_AggregateUDF,
+    name: &RStr,
+    input_fields: RVec<WrappedSchema>,
+    return_field: WrappedSchema,
+    ordering_fields: RVec<RVec<u8>>,
+    is_distinct: bool,
+) -> RResult<RVec<RVec<u8>>, RString> {
+    let udaf = udaf.inner();
+
+    let input_fields = 
&rresult_return!(rvec_wrapped_to_vec_fieldref(&input_fields));
+    let return_field = 
rresult_return!(Field::try_from(&return_field.0)).into();
+
+    let ordering_fields = &rresult_return!(ordering_fields
+        .into_iter()
+        .map(|field_bytes| 
datafusion_proto_common::Field::decode(field_bytes.as_ref()))
+        .collect::<std::result::Result<Vec<_>, DecodeError>>());
+
+    let ordering_fields = 
&rresult_return!(parse_proto_fields_to_fields(ordering_fields))
+        .into_iter()
+        .map(Arc::new)
+        .collect::<Vec<_>>();
+
+    let args = StateFieldsArgs {
+        name: name.as_str(),
+        input_fields,
+        return_field,
+        ordering_fields,
+        is_distinct,
+    };
+
+    let state_fields = rresult_return!(udaf.state_fields(args));
+    let state_fields = rresult_return!(state_fields
+        .iter()
+        .map(|f| f.as_ref())
+        .map(datafusion_proto::protobuf::Field::try_from)
+        .map(|v| v.map_err(DataFusionError::from))
+        .collect::<Result<Vec<_>>>())
+    .into_iter()
+    .map(|field| field.encode_to_vec().into())
+    .collect();
+
+    RResult::ROk(state_fields)
+}
+
+unsafe extern "C" fn order_sensitivity_fn_wrapper(
+    udaf: &FFI_AggregateUDF,
+) -> FFI_AggregateOrderSensitivity {
+    udaf.inner().order_sensitivity().into()
+}
+
+unsafe extern "C" fn coerce_types_fn_wrapper(
+    udaf: &FFI_AggregateUDF,
+    arg_types: RVec<WrappedSchema>,
+) -> RResult<RVec<WrappedSchema>, RString> {
+    let udaf = udaf.inner();
+
+    let arg_types = rresult_return!(rvec_wrapped_to_vec_datatype(&arg_types));
+
+    let arg_fields = arg_types
+        .iter()
+        .map(|dt| Field::new("f", dt.clone(), true))
+        .map(Arc::new)
+        .collect::<Vec<_>>();
+    let return_types = rresult_return!(fields_with_aggregate_udf(&arg_fields, 
udaf))
+        .into_iter()
+        .map(|f| f.data_type().to_owned())
+        .collect::<Vec<_>>();
+
+    rresult!(vec_datatype_to_rvec_wrapped(&return_types))
+}
+
+unsafe extern "C" fn release_fn_wrapper(udaf: &mut FFI_AggregateUDF) {
+    let private_data = Box::from_raw(udaf.private_data as *mut 
AggregateUDFPrivateData);
+    drop(private_data);
+}
+
+unsafe extern "C" fn clone_fn_wrapper(udaf: &FFI_AggregateUDF) -> 
FFI_AggregateUDF {
+    Arc::clone(udaf.inner()).into()
+}
+
+impl Clone for FFI_AggregateUDF {
+    fn clone(&self) -> Self {
+        unsafe { (self.clone)(self) }
+    }
+}
+
+impl From<Arc<AggregateUDF>> for FFI_AggregateUDF {
+    fn from(udaf: Arc<AggregateUDF>) -> Self {
+        let name = udaf.name().into();
+        let aliases = udaf.aliases().iter().map(|a| 
a.to_owned().into()).collect();
+        let is_nullable = udaf.is_nullable();
+        let volatility = udaf.signature().volatility.into();
+
+        let private_data = Box::new(AggregateUDFPrivateData { udaf });
+
+        Self {
+            name,
+            is_nullable,
+            volatility,
+            aliases,
+            return_type: return_type_fn_wrapper,
+            accumulator: accumulator_fn_wrapper,
+            create_sliding_accumulator: create_sliding_accumulator_fn_wrapper,
+            create_groups_accumulator: create_groups_accumulator_fn_wrapper,
+            groups_accumulator_supported: 
groups_accumulator_supported_fn_wrapper,
+            with_beneficial_ordering: with_beneficial_ordering_fn_wrapper,
+            state_fields: state_fields_fn_wrapper,
+            order_sensitivity: order_sensitivity_fn_wrapper,
+            coerce_types: coerce_types_fn_wrapper,
+            clone: clone_fn_wrapper,
+            release: release_fn_wrapper,
+            private_data: Box::into_raw(private_data) as *mut c_void,
+        }
+    }
+}
+
+impl Drop for FFI_AggregateUDF {
+    fn drop(&mut self) {
+        unsafe { (self.release)(self) }
+    }
+}
+
+/// This struct is used to access an UDF provided by a foreign
+/// library across a FFI boundary.
+///
+/// The ForeignAggregateUDF is to be used by the caller of the UDF, so it has
+/// no knowledge or access to the private data. All interaction with the UDF
+/// must occur through the functions defined in FFI_AggregateUDF.
+#[derive(Debug)]
+pub struct ForeignAggregateUDF {
+    signature: Signature,
+    aliases: Vec<String>,
+    udaf: FFI_AggregateUDF,
+}
+
+unsafe impl Send for ForeignAggregateUDF {}
+unsafe impl Sync for ForeignAggregateUDF {}
+
+impl TryFrom<&FFI_AggregateUDF> for ForeignAggregateUDF {
+    type Error = DataFusionError;
+
+    fn try_from(udaf: &FFI_AggregateUDF) -> Result<Self, Self::Error> {
+        let signature = Signature::user_defined((&udaf.volatility).into());
+        let aliases = udaf.aliases.iter().map(|s| s.to_string()).collect();
+
+        Ok(Self {
+            udaf: udaf.clone(),
+            signature,
+            aliases,
+        })
+    }
+}
+
+impl AggregateUDFImpl for ForeignAggregateUDF {
+    fn as_any(&self) -> &dyn std::any::Any {
+        self
+    }
+
+    fn name(&self) -> &str {
+        self.udaf.name.as_str()
+    }
+
+    fn signature(&self) -> &Signature {
+        &self.signature
+    }
+
+    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
+        let arg_types = vec_datatype_to_rvec_wrapped(arg_types)?;
+
+        let result = unsafe { (self.udaf.return_type)(&self.udaf, arg_types) };
+
+        let result = df_result!(result);
+
+        result.and_then(|r| (&r.0).try_into().map_err(DataFusionError::from))
+    }
+
+    fn is_nullable(&self) -> bool {
+        self.udaf.is_nullable
+    }
+
+    fn accumulator(&self, acc_args: AccumulatorArgs) -> Result<Box<dyn 
Accumulator>> {
+        let args = acc_args.try_into()?;
+        unsafe {
+            df_result!((self.udaf.accumulator)(&self.udaf, args)).map(|accum| {
+                Box::new(ForeignAccumulator::from(accum)) as Box<dyn 
Accumulator>
+            })
+        }
+    }
+
+    fn state_fields(&self, args: StateFieldsArgs) -> Result<Vec<FieldRef>> {
+        unsafe {
+            let name = RStr::from_str(args.name);
+            let input_fields = 
vec_fieldref_to_rvec_wrapped(args.input_fields)?;
+            let return_field =
+                
WrappedSchema(FFI_ArrowSchema::try_from(args.return_field.as_ref())?);
+            let ordering_fields = args
+                .ordering_fields
+                .iter()
+                .map(|f| f.as_ref())
+                .map(datafusion_proto::protobuf::Field::try_from)
+                .map(|v| v.map_err(DataFusionError::from))
+                .collect::<Result<Vec<_>>>()?
+                .into_iter()
+                .map(|proto_field| proto_field.encode_to_vec().into())
+                .collect();
+
+            let fields = df_result!((self.udaf.state_fields)(
+                &self.udaf,
+                &name,
+                input_fields,
+                return_field,
+                ordering_fields,
+                args.is_distinct
+            ))?;
+            let fields = fields
+                .into_iter()
+                .map(|field_bytes| {
+                    
datafusion_proto_common::Field::decode(field_bytes.as_ref())
+                        .map_err(|e| DataFusionError::Execution(e.to_string()))
+                })
+                .collect::<Result<Vec<_>>>()?;
+
+            parse_proto_fields_to_fields(fields.iter())
+                .map(|fields| fields.into_iter().map(Arc::new).collect())
+                .map_err(|e| DataFusionError::Execution(e.to_string()))
+        }
+    }
+
+    fn groups_accumulator_supported(&self, args: AccumulatorArgs) -> bool {
+        let args = match FFI_AccumulatorArgs::try_from(args) {
+            Ok(v) => v,
+            Err(e) => {
+                log::warn!("Attempting to convert accumulator arguments: {e}");
+                return false;
+            }
+        };
+
+        unsafe { (self.udaf.groups_accumulator_supported)(&self.udaf, args) }
+    }
+
+    fn create_groups_accumulator(
+        &self,
+        args: AccumulatorArgs,
+    ) -> Result<Box<dyn GroupsAccumulator>> {
+        let args = FFI_AccumulatorArgs::try_from(args)?;
+
+        unsafe {
+            df_result!((self.udaf.create_groups_accumulator)(&self.udaf, 
args)).map(
+                |accum| {
+                    Box::new(ForeignGroupsAccumulator::from(accum))
+                        as Box<dyn GroupsAccumulator>
+                },
+            )
+        }
+    }
+
+    fn aliases(&self) -> &[String] {
+        &self.aliases
+    }
+
+    fn create_sliding_accumulator(
+        &self,
+        args: AccumulatorArgs,
+    ) -> Result<Box<dyn Accumulator>> {
+        let args = args.try_into()?;
+        unsafe {
+            df_result!((self.udaf.create_sliding_accumulator)(&self.udaf, 
args)).map(
+                |accum| Box::new(ForeignAccumulator::from(accum)) as Box<dyn 
Accumulator>,
+            )
+        }
+    }
+
+    fn with_beneficial_ordering(
+        self: Arc<Self>,
+        beneficial_ordering: bool,
+    ) -> Result<Option<Arc<dyn AggregateUDFImpl>>> {
+        unsafe {
+            let result = df_result!((self.udaf.with_beneficial_ordering)(
+                &self.udaf,
+                beneficial_ordering
+            ))?
+            .into_option();
+
+            let result = result
+                .map(|func| ForeignAggregateUDF::try_from(&func))
+                .transpose()?;
+
+            Ok(result.map(|func| Arc::new(func) as Arc<dyn AggregateUDFImpl>))
+        }
+    }
+
+    fn order_sensitivity(&self) -> AggregateOrderSensitivity {
+        unsafe { (self.udaf.order_sensitivity)(&self.udaf).into() }
+    }
+
+    fn simplify(&self) -> Option<AggregateFunctionSimplification> {
+        None
+    }
+
+    fn coerce_types(&self, arg_types: &[DataType]) -> Result<Vec<DataType>> {
+        unsafe {
+            let arg_types = vec_datatype_to_rvec_wrapped(arg_types)?;
+            let result_types =
+                df_result!((self.udaf.coerce_types)(&self.udaf, arg_types))?;
+            Ok(rvec_wrapped_to_vec_datatype(&result_types)?)
+        }
+    }
+}
+
+#[repr(C)]
+#[derive(Debug, StableAbi)]
+#[allow(non_camel_case_types)]
+pub enum FFI_AggregateOrderSensitivity {
+    Insensitive,
+    HardRequirement,
+    Beneficial,
+}
+
+impl From<FFI_AggregateOrderSensitivity> for AggregateOrderSensitivity {
+    fn from(value: FFI_AggregateOrderSensitivity) -> Self {
+        match value {
+            FFI_AggregateOrderSensitivity::Insensitive => Self::Insensitive,
+            FFI_AggregateOrderSensitivity::HardRequirement => 
Self::HardRequirement,
+            FFI_AggregateOrderSensitivity::Beneficial => Self::Beneficial,
+        }
+    }
+}
+
+impl From<AggregateOrderSensitivity> for FFI_AggregateOrderSensitivity {
+    fn from(value: AggregateOrderSensitivity) -> Self {
+        match value {
+            AggregateOrderSensitivity::Insensitive => Self::Insensitive,
+            AggregateOrderSensitivity::HardRequirement => 
Self::HardRequirement,
+            AggregateOrderSensitivity::Beneficial => Self::Beneficial,
+        }
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use arrow::datatypes::Schema;
+    use datafusion::{
+        common::create_array,
+        functions_aggregate::sum::Sum,
+        physical_expr::{LexOrdering, PhysicalSortExpr},
+        physical_plan::expressions::col,
+        scalar::ScalarValue,
+    };
+
+    use super::*;
+
+    fn create_test_foreign_udaf(
+        original_udaf: impl AggregateUDFImpl + 'static,
+    ) -> Result<AggregateUDF> {
+        let original_udaf = Arc::new(AggregateUDF::from(original_udaf));
+
+        let local_udaf: FFI_AggregateUDF = Arc::clone(&original_udaf).into();
+
+        let foreign_udaf: ForeignAggregateUDF = (&local_udaf).try_into()?;
+        Ok(foreign_udaf.into())
+    }
+
+    #[test]
+    fn test_round_trip_udaf() -> Result<()> {
+        let original_udaf = Sum::new();
+        let original_name = original_udaf.name().to_owned();
+        let original_udaf = Arc::new(AggregateUDF::from(original_udaf));
+
+        // Convert to FFI format
+        let local_udaf: FFI_AggregateUDF = Arc::clone(&original_udaf).into();
+
+        // Convert back to native format
+        let foreign_udaf: ForeignAggregateUDF = (&local_udaf).try_into()?;
+        let foreign_udaf: AggregateUDF = foreign_udaf.into();
+
+        assert_eq!(original_name, foreign_udaf.name());
+        Ok(())
+    }
+
+    #[test]
+    fn test_foreign_udaf_aliases() -> Result<()> {
+        let foreign_udaf =
+            
create_test_foreign_udaf(Sum::new())?.with_aliases(["my_function"]);
+
+        let return_type = foreign_udaf.return_type(&[DataType::Float64])?;
+        assert_eq!(return_type, DataType::Float64);
+        Ok(())
+    }
+
+    #[test]
+    fn test_foreign_udaf_accumulator() -> Result<()> {
+        let foreign_udaf = create_test_foreign_udaf(Sum::new())?;
+
+        let schema = Schema::new(vec![Field::new("a", DataType::Float64, 
true)]);
+        let acc_args = AccumulatorArgs {
+            return_field: Field::new("f", DataType::Float64, true).into(),
+            schema: &schema,
+            ignore_nulls: true,
+            ordering_req: &LexOrdering::new(vec![PhysicalSortExpr {
+                expr: col("a", &schema)?,
+                options: Default::default(),
+            }]),
+            is_reversed: false,
+            name: "round_trip",
+            is_distinct: true,
+            exprs: &[col("a", &schema)?],
+        };
+        let mut accumulator = foreign_udaf.accumulator(acc_args)?;
+        let values = create_array!(Float64, vec![10., 20., 30., 40., 50.]);
+        accumulator.update_batch(&[values])?;
+        let resultant_value = accumulator.evaluate()?;
+        assert_eq!(resultant_value, ScalarValue::Float64(Some(150.)));
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_beneficial_ordering() -> Result<()> {
+        let foreign_udaf = create_test_foreign_udaf(
+            datafusion::functions_aggregate::first_last::FirstValue::new(),
+        )?;
+
+        let foreign_udaf = 
foreign_udaf.with_beneficial_ordering(true)?.unwrap();
+
+        assert_eq!(
+            foreign_udaf.order_sensitivity(),
+            AggregateOrderSensitivity::Beneficial
+        );
+
+        let a_field = Arc::new(Field::new("a", DataType::Float64, true));
+        let state_fields = foreign_udaf.state_fields(StateFieldsArgs {
+            name: "a",
+            input_fields: &[Field::new("f", DataType::Float64, true).into()],
+            return_field: Field::new("f", DataType::Float64, true).into(),
+            ordering_fields: &[Arc::clone(&a_field)],
+            is_distinct: false,
+        })?;
+
+        assert_eq!(state_fields.len(), 3);
+        assert_eq!(state_fields[1], a_field);
+        Ok(())
+    }
+
+    #[test]
+    fn test_sliding_accumulator() -> Result<()> {
+        let foreign_udaf = create_test_foreign_udaf(Sum::new())?;
+
+        let schema = Schema::new(vec![Field::new("a", DataType::Float64, 
true)]);
+        let acc_args = AccumulatorArgs {
+            return_field: Field::new("f", DataType::Float64, true).into(),
+            schema: &schema,
+            ignore_nulls: true,
+            ordering_req: &LexOrdering::new(vec![PhysicalSortExpr {
+                expr: col("a", &schema)?,
+                options: Default::default(),
+            }]),
+            is_reversed: false,
+            name: "round_trip",
+            is_distinct: true,
+            exprs: &[col("a", &schema)?],
+        };
+
+        let mut accumulator = 
foreign_udaf.create_sliding_accumulator(acc_args)?;
+        let values = create_array!(Float64, vec![10., 20., 30., 40., 50.]);
+        accumulator.update_batch(&[values])?;
+        let resultant_value = accumulator.evaluate()?;
+        assert_eq!(resultant_value, ScalarValue::Float64(Some(150.)));
+
+        Ok(())
+    }
+
+    fn test_round_trip_order_sensitivity(sensitivity: 
AggregateOrderSensitivity) {
+        let ffi_sensitivity: FFI_AggregateOrderSensitivity = 
sensitivity.into();
+        let round_trip_sensitivity: AggregateOrderSensitivity = 
ffi_sensitivity.into();
+
+        assert_eq!(sensitivity, round_trip_sensitivity);
+    }
+
+    #[test]
+    fn test_round_trip_all_order_sensitivities() {
+        
test_round_trip_order_sensitivity(AggregateOrderSensitivity::Insensitive);
+        
test_round_trip_order_sensitivity(AggregateOrderSensitivity::HardRequirement);
+        
test_round_trip_order_sensitivity(AggregateOrderSensitivity::Beneficial);
+    }
+}
diff --git a/datafusion/ffi/src/util.rs b/datafusion/ffi/src/util.rs
index 3eb57963b4..abe369c572 100644
--- a/datafusion/ffi/src/util.rs
+++ b/datafusion/ffi/src/util.rs
@@ -22,7 +22,7 @@ use arrow::{datatypes::DataType, ffi::FFI_ArrowSchema};
 use arrow_schema::FieldRef;
 use std::sync::Arc;
 
-/// This macro is a helpful conversion utility to conver from an 
abi_stable::RResult to a
+/// This macro is a helpful conversion utility to convert from an 
abi_stable::RResult to a
 /// DataFusion result.
 #[macro_export]
 macro_rules! df_result {
diff --git a/datafusion/ffi/tests/ffi_integration.rs 
b/datafusion/ffi/tests/ffi_integration.rs
index c6df324e9a..1ef16fbaa4 100644
--- a/datafusion/ffi/tests/ffi_integration.rs
+++ b/datafusion/ffi/tests/ffi_integration.rs
@@ -19,7 +19,6 @@
 /// when the feature integtation-tests is built
 #[cfg(feature = "integration-tests")]
 mod tests {
-
     use datafusion::error::{DataFusionError, Result};
     use datafusion::prelude::SessionContext;
     use datafusion_ffi::catalog_provider::ForeignCatalogProvider;
diff --git a/datafusion/ffi/tests/ffi_udaf.rs b/datafusion/ffi/tests/ffi_udaf.rs
new file mode 100644
index 0000000000..31b1f47391
--- /dev/null
+++ b/datafusion/ffi/tests/ffi_udaf.rs
@@ -0,0 +1,129 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+/// Add an additional module here for convenience to scope this to only
+/// when the feature integtation-tests is built
+#[cfg(feature = "integration-tests")]
+mod tests {
+    use arrow::array::Float64Array;
+    use datafusion::common::record_batch;
+    use datafusion::error::{DataFusionError, Result};
+    use datafusion::logical_expr::AggregateUDF;
+    use datafusion::prelude::{col, SessionContext};
+
+    use datafusion_ffi::tests::utils::get_module;
+    use datafusion_ffi::udaf::ForeignAggregateUDF;
+
+    #[tokio::test]
+    async fn test_ffi_udaf() -> Result<()> {
+        let module = get_module()?;
+
+        let ffi_sum_func =
+            module
+                .create_sum_udaf()
+                .ok_or(DataFusionError::NotImplemented(
+                    "External table provider failed to implement 
create_udaf".to_string(),
+                ))?();
+        let foreign_sum_func: ForeignAggregateUDF = 
(&ffi_sum_func).try_into()?;
+
+        let udaf: AggregateUDF = foreign_sum_func.into();
+
+        let ctx = SessionContext::default();
+        let record_batch = record_batch!(
+            ("a", Int32, vec![1, 2, 2, 4, 4, 4, 4]),
+            ("b", Float64, vec![1.0, 2.0, 2.0, 4.0, 4.0, 4.0, 4.0])
+        )
+        .unwrap();
+
+        let df = ctx.read_batch(record_batch)?;
+
+        let df = df
+            .aggregate(
+                vec![col("a")],
+                vec![udaf.call(vec![col("b")]).alias("sum_b")],
+            )?
+            .sort_by(vec![col("a")])?;
+
+        let result = df.collect().await?;
+
+        let expected = record_batch!(
+            ("a", Int32, vec![1, 2, 4]),
+            ("sum_b", Float64, vec![1.0, 4.0, 16.0])
+        )?;
+
+        assert_eq!(result[0], expected);
+
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_ffi_grouping_udaf() -> Result<()> {
+        let module = get_module()?;
+
+        let ffi_stddev_func =
+            module
+                .create_stddev_udaf()
+                .ok_or(DataFusionError::NotImplemented(
+                    "External table provider failed to implement 
create_udaf".to_string(),
+                ))?();
+        let foreign_stddev_func: ForeignAggregateUDF = 
(&ffi_stddev_func).try_into()?;
+
+        let udaf: AggregateUDF = foreign_stddev_func.into();
+
+        let ctx = SessionContext::default();
+        let record_batch = record_batch!(
+            ("a", Int32, vec![1, 2, 2, 4, 4, 4, 4]),
+            (
+                "b",
+                Float64,
+                vec![
+                    1.0,
+                    2.0,
+                    2.0 + 2.0_f64.sqrt(),
+                    4.0,
+                    4.0,
+                    4.0 + 3.0_f64.sqrt(),
+                    4.0 + 3.0_f64.sqrt()
+                ]
+            )
+        )
+        .unwrap();
+
+        let df = ctx.read_batch(record_batch)?;
+
+        let df = df
+            .aggregate(
+                vec![col("a")],
+                vec![udaf.call(vec![col("b")]).alias("stddev_b")],
+            )?
+            .sort_by(vec![col("a")])?;
+
+        let result = df.collect().await?;
+        let result = result[0].column_by_name("stddev_b").unwrap();
+        let result = result
+            .as_any()
+            .downcast_ref::<Float64Array>()
+            .unwrap()
+            .values();
+
+        assert!(result.first().unwrap().is_nan());
+        assert!(result.get(1).unwrap() - 1.0 < 0.00001);
+        assert!(result.get(2).unwrap() - 1.0 < 0.00001);
+
+        Ok(())
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org
For additional commands, e-mail: commits-h...@datafusion.apache.org


Reply via email to