This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 75c7da5da3 Pass ConfigOptions to scalar UDFs via FFI (#20454)
75c7da5da3 is described below

commit 75c7da5da33915d698de2bbe412fca2e104a2aa4
Author: Tim Saucer <[email protected]>
AuthorDate: Tue Mar 10 12:34:20 2026 +0100

    Pass ConfigOptions to scalar UDFs via FFI (#20454)
    
    ## Which issue does this PR close?
    
    - Closes https://github.com/apache/datafusion/issues/17035
    
    ## Rationale for this change
    
    Now that we have proper `FFI_ConfigOptions` we can pass these to scalar
    UDFs via FFI.
    
    ## What changes are included in this PR?
    
    Instead of passing default options, pass in converted config options
    from the input.
    
    Also did a drive by cleanup of switching to using FFI_ColumnarValue
    since it is now available.
    
    ## Are these changes tested?
    
    Unit test added.
    
    ## Are there any user-facing changes?
    
    This is a breaking API change, but not one that users will interact with
    directly. It breaks the ABI for FFI libraries, which is currently
    unstable.
---
 datafusion/ffi/src/tests/mod.rs           |  3 ++
 datafusion/ffi/src/tests/udf_udaf_udwf.rs | 44 +++++++++++++++++++++++++++-
 datafusion/ffi/src/udf/mod.rs             | 39 ++++++++++++-------------
 datafusion/ffi/tests/ffi_udf.rs           | 48 +++++++++++++++++++++++++++++--
 4 files changed, 110 insertions(+), 24 deletions(-)

diff --git a/datafusion/ffi/src/tests/mod.rs b/datafusion/ffi/src/tests/mod.rs
index cbee5febdb..f594993e35 100644
--- a/datafusion/ffi/src/tests/mod.rs
+++ b/datafusion/ffi/src/tests/mod.rs
@@ -84,6 +84,8 @@ pub struct ForeignLibraryModule {
 
     pub create_nullary_udf: extern "C" fn() -> FFI_ScalarUDF,
 
+    pub create_timezone_udf: extern "C" fn() -> FFI_ScalarUDF,
+
     pub create_table_function:
         extern "C" fn(FFI_LogicalExtensionCodec) -> FFI_TableFunction,
 
@@ -157,6 +159,7 @@ pub fn get_foreign_library_module() -> 
ForeignLibraryModuleRef {
         create_table_factory: construct_table_provider_factory,
         create_scalar_udf: create_ffi_abs_func,
         create_nullary_udf: create_ffi_random_func,
+        create_timezone_udf: udf_udaf_udwf::create_timezone_func,
         create_table_function: create_ffi_table_func,
         create_sum_udaf: create_ffi_sum_func,
         create_stddev_udaf: create_ffi_stddev_func,
diff --git a/datafusion/ffi/src/tests/udf_udaf_udwf.rs 
b/datafusion/ffi/src/tests/udf_udaf_udwf.rs
index d88ddfb28d..b9ab20b115 100644
--- a/datafusion/ffi/src/tests/udf_udaf_udwf.rs
+++ b/datafusion/ffi/src/tests/udf_udaf_udwf.rs
@@ -20,9 +20,10 @@ use std::sync::Arc;
 
 use arrow_schema::DataType;
 use datafusion_catalog::TableFunctionImpl;
+use datafusion_common::ScalarValue;
 use datafusion_expr::{
     AggregateUDF, ColumnarValue, ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl, 
Signature,
-    WindowUDF,
+    Volatility, WindowUDF,
 };
 use datafusion_functions::math::abs::AbsFunc;
 use datafusion_functions::math::random::RandomFunc;
@@ -78,6 +79,47 @@ pub(crate) extern "C" fn create_ffi_random_func() -> 
FFI_ScalarUDF {
     udf.into()
 }
 
+#[derive(Debug, PartialEq, Eq, Hash)]
+struct TimeZoneUDF {
+    signature: Signature,
+}
+
+impl ScalarUDFImpl for TimeZoneUDF {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+    fn name(&self) -> &str {
+        "TimeZoneUDF"
+    }
+
+    fn signature(&self) -> &Signature {
+        &self.signature
+    }
+
+    fn return_type(
+        &self,
+        _arg_types: &[DataType],
+    ) -> datafusion_common::Result<DataType> {
+        Ok(DataType::Utf8)
+    }
+
+    fn invoke_with_args(
+        &self,
+        args: ScalarFunctionArgs,
+    ) -> datafusion_common::Result<ColumnarValue> {
+        let tz = args.config_options.execution.time_zone.clone();
+        Ok(ColumnarValue::Scalar(ScalarValue::from(tz)))
+    }
+}
+
+pub(crate) extern "C" fn create_timezone_func() -> FFI_ScalarUDF {
+    let udf: Arc<ScalarUDF> = Arc::new(ScalarUDF::from(TimeZoneUDF {
+        signature: Signature::uniform(1, vec![DataType::Utf8], 
Volatility::Stable),
+    }));
+
+    udf.into()
+}
+
 pub(crate) extern "C" fn create_ffi_table_func(
     codec: FFI_LogicalExtensionCodec,
 ) -> FFI_TableFunction {
diff --git a/datafusion/ffi/src/udf/mod.rs b/datafusion/ffi/src/udf/mod.rs
index 94be5f38ea..c6ef0f2a50 100644
--- a/datafusion/ffi/src/udf/mod.rs
+++ b/datafusion/ffi/src/udf/mod.rs
@@ -20,8 +20,8 @@ use std::hash::{Hash, Hasher};
 use std::sync::Arc;
 
 use abi_stable::StableAbi;
-use abi_stable::std_types::{RResult, RString, RVec};
-use arrow::array::ArrayRef;
+use abi_stable::std_types::{RString, RVec};
+use arrow::array::Array;
 use arrow::datatypes::{DataType, Field};
 use arrow::error::ArrowError;
 use arrow::ffi::{FFI_ArrowSchema, from_ffi, to_ffi};
@@ -38,6 +38,8 @@ use return_type_args::{
 };
 
 use crate::arrow_wrappers::{WrappedArray, WrappedSchema};
+use crate::config::FFI_ConfigOptions;
+use crate::expr::columnar_value::FFI_ColumnarValue;
 use crate::util::{
     FFIResult, rvec_wrapped_to_vec_datatype, vec_datatype_to_rvec_wrapped,
 };
@@ -73,7 +75,8 @@ pub struct FFI_ScalarUDF {
         arg_fields: RVec<WrappedSchema>,
         num_rows: usize,
         return_field: WrappedSchema,
-    ) -> FFIResult<WrappedArray>,
+        config_options: FFI_ConfigOptions,
+    ) -> FFIResult<FFI_ColumnarValue>,
 
     /// See [`ScalarUDFImpl`] for details on short_circuits
     pub short_circuits: bool,
@@ -159,7 +162,8 @@ unsafe extern "C" fn invoke_with_args_fn_wrapper(
     arg_fields: RVec<WrappedSchema>,
     number_rows: usize,
     return_field: WrappedSchema,
-) -> FFIResult<WrappedArray> {
+    config_options: FFI_ConfigOptions,
+) -> FFIResult<FFI_ColumnarValue> {
     unsafe {
         let args = args
             .into_iter()
@@ -181,28 +185,22 @@ unsafe extern "C" fn invoke_with_args_fn_wrapper(
             })
             .collect::<Result<Vec<FieldRef>>>();
         let arg_fields = rresult_return!(arg_fields);
+        let config_options = 
rresult_return!(ConfigOptions::try_from(config_options));
+        let config_options = Arc::new(config_options);
 
         let args = ScalarFunctionArgs {
             args,
             arg_fields,
             number_rows,
             return_field,
-            // TODO: pass config options: 
https://github.com/apache/datafusion/issues/17035
-            config_options: Arc::new(ConfigOptions::default()),
+            config_options,
         };
 
-        let result = rresult_return!(
+        rresult!(
             udf.inner()
                 .invoke_with_args(args)
-                .and_then(|r| r.to_array(number_rows))
-        );
-
-        let (result_array, result_schema) = 
rresult_return!(to_ffi(&result.to_data()));
-
-        RResult::ROk(WrappedArray {
-            array: result_array,
-            schema: WrappedSchema(result_schema),
-        })
+                .and_then(FFI_ColumnarValue::try_from)
+        )
     }
 }
 
@@ -366,8 +364,7 @@ impl ScalarUDFImpl for ForeignScalarUDF {
             arg_fields,
             number_rows,
             return_field,
-            // TODO: pass config options: 
https://github.com/apache/datafusion/issues/17035
-            config_options: _config_options,
+            config_options,
         } = invoke_args;
 
         let args = args
@@ -396,6 +393,7 @@ impl ScalarUDFImpl for ForeignScalarUDF {
 
         let return_field = return_field.as_ref().clone();
         let return_field = 
WrappedSchema(FFI_ArrowSchema::try_from(return_field)?);
+        let config_options = config_options.as_ref().into();
 
         let result = unsafe {
             (self.udf.invoke_with_args)(
@@ -404,13 +402,12 @@ impl ScalarUDFImpl for ForeignScalarUDF {
                 arg_fields,
                 number_rows,
                 return_field,
+                config_options,
             )
         };
 
         let result = df_result!(result)?;
-        let result_array: ArrayRef = result.try_into()?;
-
-        Ok(ColumnarValue::Array(result_array))
+        result.try_into()
     }
 
     fn aliases(&self) -> &[String] {
diff --git a/datafusion/ffi/tests/ffi_udf.rs b/datafusion/ffi/tests/ffi_udf.rs
index c659e27f02..02dfba599f 100644
--- a/datafusion/ffi/tests/ffi_udf.rs
+++ b/datafusion/ffi/tests/ffi_udf.rs
@@ -19,15 +19,17 @@
 /// when the feature integration-tests is built
 #[cfg(feature = "integration-tests")]
 mod tests {
-    use std::sync::Arc;
-
+    use arrow::array::{Array, AsArray};
     use arrow::datatypes::DataType;
     use datafusion::common::record_batch;
     use datafusion::error::{DataFusionError, Result};
     use datafusion::logical_expr::{ScalarUDF, ScalarUDFImpl};
     use datafusion::prelude::{SessionContext, col};
+    use datafusion_execution::config::SessionConfig;
+    use datafusion_expr::lit;
     use datafusion_ffi::tests::create_record_batch;
     use datafusion_ffi::tests::utils::get_module;
+    use std::sync::Arc;
 
     /// This test validates that we can load an external module and use a 
scalar
     /// udf defined in it via the foreign function interface. In this case we 
are
@@ -100,4 +102,46 @@ mod tests {
 
         Ok(())
     }
+
+    #[tokio::test]
+    async fn test_config_on_scalar_udf() -> Result<()> {
+        let module = get_module()?;
+
+        let ffi_udf =
+            module
+                .create_timezone_udf()
+                .ok_or(DataFusionError::NotImplemented(
+                    "External module failed to implement 
create_timezone_udf".to_string(),
+                ))?();
+        let foreign_udf: Arc<dyn ScalarUDFImpl> = (&ffi_udf).into();
+
+        let udf = ScalarUDF::new_from_shared_impl(foreign_udf);
+
+        let ctx = SessionContext::default();
+
+        let df = ctx
+            .read_empty()?
+            .select(vec![udf.call(vec![lit("a")]).alias("a")])?;
+
+        let result = df.collect().await?;
+        assert!(result[0].column(0).as_string::<i32>().is_null(0));
+
+        let mut config = SessionConfig::new();
+        config.options_mut().execution.time_zone = Some("AEST".into());
+
+        let ctx = SessionContext::new_with_config(config);
+
+        let df = ctx
+            .read_empty()?
+            .select(vec![udf.call(vec![lit("a")]).alias("a")])?;
+
+        let result = df.collect().await?;
+
+        assert!(result.len() == 1);
+        assert!(!result[0].column(0).as_string::<i32>().is_null(0));
+        let result = result[0].column(0).as_string::<i32>().value(0);
+        assert_eq!(result, "AEST");
+
+        Ok(())
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to