This is an automated email from the ASF dual-hosted git repository.
github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 75c7da5da3 Pass ConfigOptions to scalar UDFs via FFI (#20454)
75c7da5da3 is described below
commit 75c7da5da33915d698de2bbe412fca2e104a2aa4
Author: Tim Saucer <[email protected]>
AuthorDate: Tue Mar 10 12:34:20 2026 +0100
Pass ConfigOptions to scalar UDFs via FFI (#20454)
## Which issue does this PR close?
- Closes https://github.com/apache/datafusion/issues/17035
## Rationale for this change
Now that we have proper `FFI_ConfigOptions` we can pass these to scalar
UDFs via FFI.
## What changes are included in this PR?
Instead of passing default options, pass in converted config options
from the input.
Also did a drive by cleanup of switching to using FFI_ColumnarValue
since it is now available.
## Are these changes tested?
Unit test added.
## Are there any user-facing changes?
This is a breaking API change, but not one that users will interact with
directly. It breaks the ABI for FFI libraries, which is currently
unstable.
---
datafusion/ffi/src/tests/mod.rs | 3 ++
datafusion/ffi/src/tests/udf_udaf_udwf.rs | 44 +++++++++++++++++++++++++++-
datafusion/ffi/src/udf/mod.rs | 39 ++++++++++++-------------
datafusion/ffi/tests/ffi_udf.rs | 48 +++++++++++++++++++++++++++++--
4 files changed, 110 insertions(+), 24 deletions(-)
diff --git a/datafusion/ffi/src/tests/mod.rs b/datafusion/ffi/src/tests/mod.rs
index cbee5febdb..f594993e35 100644
--- a/datafusion/ffi/src/tests/mod.rs
+++ b/datafusion/ffi/src/tests/mod.rs
@@ -84,6 +84,8 @@ pub struct ForeignLibraryModule {
pub create_nullary_udf: extern "C" fn() -> FFI_ScalarUDF,
+ pub create_timezone_udf: extern "C" fn() -> FFI_ScalarUDF,
+
pub create_table_function:
extern "C" fn(FFI_LogicalExtensionCodec) -> FFI_TableFunction,
@@ -157,6 +159,7 @@ pub fn get_foreign_library_module() ->
ForeignLibraryModuleRef {
create_table_factory: construct_table_provider_factory,
create_scalar_udf: create_ffi_abs_func,
create_nullary_udf: create_ffi_random_func,
+ create_timezone_udf: udf_udaf_udwf::create_timezone_func,
create_table_function: create_ffi_table_func,
create_sum_udaf: create_ffi_sum_func,
create_stddev_udaf: create_ffi_stddev_func,
diff --git a/datafusion/ffi/src/tests/udf_udaf_udwf.rs
b/datafusion/ffi/src/tests/udf_udaf_udwf.rs
index d88ddfb28d..b9ab20b115 100644
--- a/datafusion/ffi/src/tests/udf_udaf_udwf.rs
+++ b/datafusion/ffi/src/tests/udf_udaf_udwf.rs
@@ -20,9 +20,10 @@ use std::sync::Arc;
use arrow_schema::DataType;
use datafusion_catalog::TableFunctionImpl;
+use datafusion_common::ScalarValue;
use datafusion_expr::{
AggregateUDF, ColumnarValue, ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl,
Signature,
- WindowUDF,
+ Volatility, WindowUDF,
};
use datafusion_functions::math::abs::AbsFunc;
use datafusion_functions::math::random::RandomFunc;
@@ -78,6 +79,47 @@ pub(crate) extern "C" fn create_ffi_random_func() ->
FFI_ScalarUDF {
udf.into()
}
+#[derive(Debug, PartialEq, Eq, Hash)]
+struct TimeZoneUDF {
+ signature: Signature,
+}
+
+impl ScalarUDFImpl for TimeZoneUDF {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+ fn name(&self) -> &str {
+ "TimeZoneUDF"
+ }
+
+ fn signature(&self) -> &Signature {
+ &self.signature
+ }
+
+ fn return_type(
+ &self,
+ _arg_types: &[DataType],
+ ) -> datafusion_common::Result<DataType> {
+ Ok(DataType::Utf8)
+ }
+
+ fn invoke_with_args(
+ &self,
+ args: ScalarFunctionArgs,
+ ) -> datafusion_common::Result<ColumnarValue> {
+ let tz = args.config_options.execution.time_zone.clone();
+ Ok(ColumnarValue::Scalar(ScalarValue::from(tz)))
+ }
+}
+
+pub(crate) extern "C" fn create_timezone_func() -> FFI_ScalarUDF {
+ let udf: Arc<ScalarUDF> = Arc::new(ScalarUDF::from(TimeZoneUDF {
+ signature: Signature::uniform(1, vec![DataType::Utf8],
Volatility::Stable),
+ }));
+
+ udf.into()
+}
+
pub(crate) extern "C" fn create_ffi_table_func(
codec: FFI_LogicalExtensionCodec,
) -> FFI_TableFunction {
diff --git a/datafusion/ffi/src/udf/mod.rs b/datafusion/ffi/src/udf/mod.rs
index 94be5f38ea..c6ef0f2a50 100644
--- a/datafusion/ffi/src/udf/mod.rs
+++ b/datafusion/ffi/src/udf/mod.rs
@@ -20,8 +20,8 @@ use std::hash::{Hash, Hasher};
use std::sync::Arc;
use abi_stable::StableAbi;
-use abi_stable::std_types::{RResult, RString, RVec};
-use arrow::array::ArrayRef;
+use abi_stable::std_types::{RString, RVec};
+use arrow::array::Array;
use arrow::datatypes::{DataType, Field};
use arrow::error::ArrowError;
use arrow::ffi::{FFI_ArrowSchema, from_ffi, to_ffi};
@@ -38,6 +38,8 @@ use return_type_args::{
};
use crate::arrow_wrappers::{WrappedArray, WrappedSchema};
+use crate::config::FFI_ConfigOptions;
+use crate::expr::columnar_value::FFI_ColumnarValue;
use crate::util::{
FFIResult, rvec_wrapped_to_vec_datatype, vec_datatype_to_rvec_wrapped,
};
@@ -73,7 +75,8 @@ pub struct FFI_ScalarUDF {
arg_fields: RVec<WrappedSchema>,
num_rows: usize,
return_field: WrappedSchema,
- ) -> FFIResult<WrappedArray>,
+ config_options: FFI_ConfigOptions,
+ ) -> FFIResult<FFI_ColumnarValue>,
/// See [`ScalarUDFImpl`] for details on short_circuits
pub short_circuits: bool,
@@ -159,7 +162,8 @@ unsafe extern "C" fn invoke_with_args_fn_wrapper(
arg_fields: RVec<WrappedSchema>,
number_rows: usize,
return_field: WrappedSchema,
-) -> FFIResult<WrappedArray> {
+ config_options: FFI_ConfigOptions,
+) -> FFIResult<FFI_ColumnarValue> {
unsafe {
let args = args
.into_iter()
@@ -181,28 +185,22 @@ unsafe extern "C" fn invoke_with_args_fn_wrapper(
})
.collect::<Result<Vec<FieldRef>>>();
let arg_fields = rresult_return!(arg_fields);
+ let config_options =
rresult_return!(ConfigOptions::try_from(config_options));
+ let config_options = Arc::new(config_options);
let args = ScalarFunctionArgs {
args,
arg_fields,
number_rows,
return_field,
- // TODO: pass config options:
https://github.com/apache/datafusion/issues/17035
- config_options: Arc::new(ConfigOptions::default()),
+ config_options,
};
- let result = rresult_return!(
+ rresult!(
udf.inner()
.invoke_with_args(args)
- .and_then(|r| r.to_array(number_rows))
- );
-
- let (result_array, result_schema) =
rresult_return!(to_ffi(&result.to_data()));
-
- RResult::ROk(WrappedArray {
- array: result_array,
- schema: WrappedSchema(result_schema),
- })
+ .and_then(FFI_ColumnarValue::try_from)
+ )
}
}
@@ -366,8 +364,7 @@ impl ScalarUDFImpl for ForeignScalarUDF {
arg_fields,
number_rows,
return_field,
- // TODO: pass config options:
https://github.com/apache/datafusion/issues/17035
- config_options: _config_options,
+ config_options,
} = invoke_args;
let args = args
@@ -396,6 +393,7 @@ impl ScalarUDFImpl for ForeignScalarUDF {
let return_field = return_field.as_ref().clone();
let return_field =
WrappedSchema(FFI_ArrowSchema::try_from(return_field)?);
+ let config_options = config_options.as_ref().into();
let result = unsafe {
(self.udf.invoke_with_args)(
@@ -404,13 +402,12 @@ impl ScalarUDFImpl for ForeignScalarUDF {
arg_fields,
number_rows,
return_field,
+ config_options,
)
};
let result = df_result!(result)?;
- let result_array: ArrayRef = result.try_into()?;
-
- Ok(ColumnarValue::Array(result_array))
+ result.try_into()
}
fn aliases(&self) -> &[String] {
diff --git a/datafusion/ffi/tests/ffi_udf.rs b/datafusion/ffi/tests/ffi_udf.rs
index c659e27f02..02dfba599f 100644
--- a/datafusion/ffi/tests/ffi_udf.rs
+++ b/datafusion/ffi/tests/ffi_udf.rs
@@ -19,15 +19,17 @@
/// when the feature integration-tests is built
#[cfg(feature = "integration-tests")]
mod tests {
- use std::sync::Arc;
-
+ use arrow::array::{Array, AsArray};
use arrow::datatypes::DataType;
use datafusion::common::record_batch;
use datafusion::error::{DataFusionError, Result};
use datafusion::logical_expr::{ScalarUDF, ScalarUDFImpl};
use datafusion::prelude::{SessionContext, col};
+ use datafusion_execution::config::SessionConfig;
+ use datafusion_expr::lit;
use datafusion_ffi::tests::create_record_batch;
use datafusion_ffi::tests::utils::get_module;
+ use std::sync::Arc;
/// This test validates that we can load an external module and use a
scalar
/// udf defined in it via the foreign function interface. In this case we
are
@@ -100,4 +102,46 @@ mod tests {
Ok(())
}
+
+ #[tokio::test]
+ async fn test_config_on_scalar_udf() -> Result<()> {
+ let module = get_module()?;
+
+ let ffi_udf =
+ module
+ .create_timezone_udf()
+ .ok_or(DataFusionError::NotImplemented(
+ "External module failed to implement
create_timezone_udf".to_string(),
+ ))?();
+ let foreign_udf: Arc<dyn ScalarUDFImpl> = (&ffi_udf).into();
+
+ let udf = ScalarUDF::new_from_shared_impl(foreign_udf);
+
+ let ctx = SessionContext::default();
+
+ let df = ctx
+ .read_empty()?
+ .select(vec![udf.call(vec![lit("a")]).alias("a")])?;
+
+ let result = df.collect().await?;
+ assert!(result[0].column(0).as_string::<i32>().is_null(0));
+
+ let mut config = SessionConfig::new();
+ config.options_mut().execution.time_zone = Some("AEST".into());
+
+ let ctx = SessionContext::new_with_config(config);
+
+ let df = ctx
+ .read_empty()?
+ .select(vec![udf.call(vec![lit("a")]).alias("a")])?;
+
+ let result = df.collect().await?;
+
+ assert!(result.len() == 1);
+ assert!(!result[0].column(0).as_string::<i32>().is_null(0));
+ let result = result[0].column(0).as_string::<i32>().value(0);
+ assert_eq!(result, "AEST");
+
+ Ok(())
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]