This is an automated email from the ASF dual-hosted git repository.
github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new a9c090141d Add support for FFI config extensions (#19469)
a9c090141d is described below
commit a9c090141d4f47221f0fc93edcf5fe2f9dfe5a98
Author: Tim Saucer <[email protected]>
AuthorDate: Tue Feb 24 08:18:02 2026 -0500
Add support for FFI config extensions (#19469)
## Which issue does this PR close?
This addresses part of https://github.com/apache/datafusion/issues/17035
This is also a blocker for
https://github.com/apache/datafusion/issues/20450
## Rationale for this change
Currently we cannot support user defined configuration extensions via
FFI. This is because much of the infrastructure on how to add and
extract custom extensions relies on knowing concrete types of the
extensions. This is not supported in FFI. This PR adds an implementation
of configuration extensions that can be used across a FFI boundary.
## What changes are included in this PR?
- Implement `FFI_ExtensionOptions`.
- Update `ConfigOptions` to check if a `datafusion_ffi` namespace exists
when setting values
- Add unit test
## Are these changes tested?
Unit test added.
Also tested against `datafusion-python` locally. With this code I have
the following test that passes. I have created a simple python exposed
`MyConfig`:
```python
from datafusion import SessionConfig
from datafusion_ffi_example import MyConfig
def test_catalog_provider():
config = MyConfig()
config = SessionConfig().with_extension(config)
config.set("my_config.baz_count", "42")
```
## Are there any user-facing changes?
New addition only.
---
datafusion/common/src/config.rs | 35 ++-
datafusion/ffi/src/config/extension_options.rs | 288 +++++++++++++++++++++++++
datafusion/ffi/src/config/mod.rs | 169 +++++++++++++++
datafusion/ffi/src/lib.rs | 1 +
datafusion/ffi/src/session/config.rs | 39 +---
datafusion/ffi/src/tests/config.rs | 51 +++++
datafusion/ffi/src/tests/mod.rs | 6 +
datafusion/ffi/tests/ffi_config.rs | 124 +++++++++++
8 files changed, 678 insertions(+), 35 deletions(-)
diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs
index dad12c1c6b..d71af206c7 100644
--- a/datafusion/common/src/config.rs
+++ b/datafusion/common/src/config.rs
@@ -1256,7 +1256,7 @@ impl<'a> TryInto<arrow::util::display::FormatOptions<'a>>
for &'a FormatOptions
}
/// A key value pair, with a corresponding description
-#[derive(Debug, Hash, PartialEq, Eq)]
+#[derive(Debug, Clone, Hash, PartialEq, Eq)]
pub struct ConfigEntry {
/// A unique string to identify this config value
pub key: String,
@@ -1352,6 +1352,10 @@ impl ConfigField for ConfigOptions {
}
}
+/// This namespace is reserved for interacting with Foreign Function Interface
+/// (FFI) based configuration extensions.
+pub const DATAFUSION_FFI_CONFIG_NAMESPACE: &str = "datafusion_ffi";
+
impl ConfigOptions {
/// Creates a new [`ConfigOptions`] with default values
pub fn new() -> Self {
@@ -1366,12 +1370,12 @@ impl ConfigOptions {
/// Set a configuration option
pub fn set(&mut self, key: &str, value: &str) -> Result<()> {
- let Some((prefix, key)) = key.split_once('.') else {
+ let Some((mut prefix, mut inner_key)) = key.split_once('.') else {
return _config_err!("could not find config namespace for key
\"{key}\"");
};
if prefix == "datafusion" {
- if key == "optimizer.enable_dynamic_filter_pushdown" {
+ if inner_key == "optimizer.enable_dynamic_filter_pushdown" {
let bool_value = value.parse::<bool>().map_err(|e| {
DataFusionError::Configuration(format!(
"Failed to parse '{value}' as bool: {e}",
@@ -1386,13 +1390,23 @@ impl ConfigOptions {
}
return Ok(());
}
- return ConfigField::set(self, key, value);
+ return ConfigField::set(self, inner_key, value);
+ }
+
+ if !self.extensions.0.contains_key(prefix)
+ && self
+ .extensions
+ .0
+ .contains_key(DATAFUSION_FFI_CONFIG_NAMESPACE)
+ {
+ inner_key = key;
+ prefix = DATAFUSION_FFI_CONFIG_NAMESPACE;
}
let Some(e) = self.extensions.0.get_mut(prefix) else {
return _config_err!("Could not find config namespace
\"{prefix}\"");
};
- e.0.set(key, value)
+ e.0.set(inner_key, value)
}
/// Create new [`ConfigOptions`], taking values from environment variables
@@ -2157,7 +2171,7 @@ impl TableOptions {
///
/// A result indicating success or failure in setting the configuration
option.
pub fn set(&mut self, key: &str, value: &str) -> Result<()> {
- let Some((prefix, _)) = key.split_once('.') else {
+ let Some((mut prefix, _)) = key.split_once('.') else {
return _config_err!("could not find config namespace for key
\"{key}\"");
};
@@ -2169,6 +2183,15 @@ impl TableOptions {
return Ok(());
}
+ if !self.extensions.0.contains_key(prefix)
+ && self
+ .extensions
+ .0
+ .contains_key(DATAFUSION_FFI_CONFIG_NAMESPACE)
+ {
+ prefix = DATAFUSION_FFI_CONFIG_NAMESPACE;
+ }
+
let Some(e) = self.extensions.0.get_mut(prefix) else {
return _config_err!("Could not find config namespace
\"{prefix}\"");
};
diff --git a/datafusion/ffi/src/config/extension_options.rs
b/datafusion/ffi/src/config/extension_options.rs
new file mode 100644
index 0000000000..48fd4e7109
--- /dev/null
+++ b/datafusion/ffi/src/config/extension_options.rs
@@ -0,0 +1,288 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::any::Any;
+use std::collections::HashMap;
+use std::ffi::c_void;
+
+use abi_stable::StableAbi;
+use abi_stable::std_types::{RResult, RStr, RString, RVec, Tuple2};
+use datafusion_common::config::{ConfigEntry, ConfigExtension,
ExtensionOptions};
+use datafusion_common::{Result, exec_err};
+
+use crate::df_result;
+
+/// A stable struct for sharing [`ExtensionOptions`] across FFI boundaries.
+///
+/// Unlike other FFI structs in this crate, we do not construct a foreign
+/// variant of this object. This is due to the typical method for interacting
+/// with extension options is by creating a local struct of your concrete type.
+/// To support this methodology use the `to_extension` method instead.
+///
+/// When using [`FFI_ExtensionOptions`] with multiple extensions, all extension
+/// values are stored on a single [`FFI_ExtensionOptions`] object. The keys
+/// are stored with the full path prefix to avoid overwriting values when using
+/// multiple extensions.
+#[repr(C)]
+#[derive(Debug, StableAbi)]
+pub struct FFI_ExtensionOptions {
+ /// Return a deep clone of this [`ExtensionOptions`]
+ pub cloned: unsafe extern "C" fn(&Self) -> FFI_ExtensionOptions,
+
+ /// Set the given `key`, `value` pair
+ pub set:
+ unsafe extern "C" fn(&mut Self, key: RStr, value: RStr) -> RResult<(),
RString>,
+
+ /// Returns the [`ConfigEntry`] stored in this [`ExtensionOptions`]
+ pub entries: unsafe extern "C" fn(&Self) -> RVec<Tuple2<RString, RString>>,
+
+ /// Release the memory of the private data when it is no longer being used.
+ pub release: unsafe extern "C" fn(&mut Self),
+
+ /// Internal data. This is only to be accessed by the provider of the
options.
+ pub private_data: *mut c_void,
+}
+
+unsafe impl Send for FFI_ExtensionOptions {}
+unsafe impl Sync for FFI_ExtensionOptions {}
+
+pub struct ExtensionOptionsPrivateData {
+ pub options: HashMap<String, String>,
+}
+
+impl FFI_ExtensionOptions {
+ #[inline]
+ fn inner_mut(&mut self) -> &mut HashMap<String, String> {
+ let private_data = self.private_data as *mut
ExtensionOptionsPrivateData;
+ unsafe { &mut (*private_data).options }
+ }
+
+ #[inline]
+ fn inner(&self) -> &HashMap<String, String> {
+ let private_data = self.private_data as *const
ExtensionOptionsPrivateData;
+ unsafe { &(*private_data).options }
+ }
+}
+
+unsafe extern "C" fn cloned_fn_wrapper(
+ options: &FFI_ExtensionOptions,
+) -> FFI_ExtensionOptions {
+ options
+ .inner()
+ .iter()
+ .map(|(k, v)| (k.to_owned(), v.to_owned()))
+ .collect::<HashMap<String, String>>()
+ .into()
+}
+
+unsafe extern "C" fn set_fn_wrapper(
+ options: &mut FFI_ExtensionOptions,
+ key: RStr,
+ value: RStr,
+) -> RResult<(), RString> {
+ let _ = options.inner_mut().insert(key.into(), value.into());
+ RResult::ROk(())
+}
+
+unsafe extern "C" fn entries_fn_wrapper(
+ options: &FFI_ExtensionOptions,
+) -> RVec<Tuple2<RString, RString>> {
+ options
+ .inner()
+ .iter()
+ .map(|(key, value)| (key.to_owned().into(),
value.to_owned().into()).into())
+ .collect()
+}
+
+unsafe extern "C" fn release_fn_wrapper(options: &mut FFI_ExtensionOptions) {
+ unsafe {
+ debug_assert!(!options.private_data.is_null());
+ let private_data =
+ Box::from_raw(options.private_data as *mut
ExtensionOptionsPrivateData);
+ drop(private_data);
+ options.private_data = std::ptr::null_mut();
+ }
+}
+
+impl Default for FFI_ExtensionOptions {
+ fn default() -> Self {
+ HashMap::new().into()
+ }
+}
+
+impl From<HashMap<String, String>> for FFI_ExtensionOptions {
+ fn from(options: HashMap<String, String>) -> Self {
+ let private_data = ExtensionOptionsPrivateData { options };
+
+ Self {
+ cloned: cloned_fn_wrapper,
+ set: set_fn_wrapper,
+ entries: entries_fn_wrapper,
+ release: release_fn_wrapper,
+ private_data: Box::into_raw(Box::new(private_data)) as *mut c_void,
+ }
+ }
+}
+
+impl Drop for FFI_ExtensionOptions {
+ fn drop(&mut self) {
+ unsafe { (self.release)(self) }
+ }
+}
+
+impl Clone for FFI_ExtensionOptions {
+ fn clone(&self) -> Self {
+ unsafe { (self.cloned)(self) }
+ }
+}
+
+impl ConfigExtension for FFI_ExtensionOptions {
+ const PREFIX: &'static str =
+ datafusion_common::config::DATAFUSION_FFI_CONFIG_NAMESPACE;
+}
+
+impl ExtensionOptions for FFI_ExtensionOptions {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn as_any_mut(&mut self) -> &mut dyn Any {
+ self
+ }
+
+ fn cloned(&self) -> Box<dyn ExtensionOptions> {
+ let ffi_options = unsafe { (self.cloned)(self) };
+ Box::new(ffi_options)
+ }
+
+ fn set(&mut self, key: &str, value: &str) -> Result<()> {
+ if key.split_once('.').is_none() {
+ return exec_err!("Unable to set FFI config value without namespace
set");
+ };
+
+ df_result!(unsafe { (self.set)(self, key.into(), value.into()) })
+ }
+
+ fn entries(&self) -> Vec<ConfigEntry> {
+ unsafe {
+ (self.entries)(self)
+ .into_iter()
+ .map(|entry_tuple| ConfigEntry {
+ key: entry_tuple.0.into(),
+ value: Some(entry_tuple.1.into()),
+ description: "ffi_config_options",
+ })
+ .collect()
+ }
+ }
+}
+
+impl FFI_ExtensionOptions {
+ /// Add all of the values in a concrete configuration extension to the
+ /// FFI variant. This is safe to call on either side of the FFI
+ /// boundary.
+ pub fn add_config<C: ConfigExtension>(&mut self, config: &C) -> Result<()>
{
+ for entry in config.entries() {
+ if let Some(value) = entry.value {
+ let key = format!("{}.{}", C::PREFIX, entry.key);
+ self.set(key.as_str(), value.as_str())?;
+ }
+ }
+
+ Ok(())
+ }
+
+ /// Merge another `FFI_ExtensionOptions` configurations into this one.
+ /// This is safe to call on either side of the FFI boundary.
+ pub fn merge(&mut self, other: &FFI_ExtensionOptions) -> Result<()> {
+ for entry in other.entries() {
+ if let Some(value) = entry.value {
+ self.set(entry.key.as_str(), value.as_str())?;
+ }
+ }
+ Ok(())
+ }
+
+ /// Create a concrete extension type from the FFI variant.
+ /// This is safe to call on either side of the FFI boundary.
+ pub fn to_extension<C: ConfigExtension + Default>(&self) -> Result<C> {
+ let mut result = C::default();
+
+ unsafe {
+ for entry in (self.entries)(self) {
+ let key = entry.0.as_str();
+ let value = entry.1.as_str();
+
+ if let Some((prefix, inner_key)) = key.split_once('.')
+ && prefix == C::PREFIX
+ {
+ result.set(inner_key, value)?;
+ }
+ }
+ }
+
+ Ok(result)
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use datafusion_common::config::{ConfigExtension, ConfigOptions};
+ use datafusion_common::extensions_options;
+
+ use crate::config::extension_options::FFI_ExtensionOptions;
+
+ // Define a new configuration struct using the `extensions_options` macro
+ extensions_options! {
+ /// My own config options.
+ pub struct MyConfig {
+ /// Should "foo" be replaced by "bar"?
+ pub foo_to_bar: bool, default = true
+
+ /// How many "baz" should be created?
+ pub baz_count: usize, default = 1337
+ }
+ }
+
+ impl ConfigExtension for MyConfig {
+ const PREFIX: &'static str = "my_config";
+ }
+
+ #[test]
+ fn round_trip_ffi_extension_options() {
+ // set up config struct and register extension
+ let mut config = ConfigOptions::default();
+ let mut ffi_options = FFI_ExtensionOptions::default();
+ ffi_options.add_config(&MyConfig::default()).unwrap();
+
+ config.extensions.insert(ffi_options);
+
+ // overwrite config default
+ config.set("my_config.baz_count", "42").unwrap();
+
+ // check config state
+ let returned_ffi_config =
+ config.extensions.get::<FFI_ExtensionOptions>().unwrap();
+ let my_config: MyConfig = returned_ffi_config.to_extension().unwrap();
+
+ // check default value
+ assert!(my_config.foo_to_bar);
+
+ // check overwritten value
+ assert_eq!(my_config.baz_count, 42);
+ }
+}
diff --git a/datafusion/ffi/src/config/mod.rs b/datafusion/ffi/src/config/mod.rs
new file mode 100644
index 0000000000..850a4dc337
--- /dev/null
+++ b/datafusion/ffi/src/config/mod.rs
@@ -0,0 +1,169 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+pub mod extension_options;
+
+use abi_stable::StableAbi;
+use abi_stable::std_types::{RHashMap, RString};
+use datafusion_common::config::{
+ ConfigExtension, ConfigOptions, ExtensionOptions, TableOptions,
+};
+use datafusion_common::{DataFusionError, Result};
+
+use crate::config::extension_options::FFI_ExtensionOptions;
+
+/// A stable struct for sharing [`ConfigOptions`] across FFI boundaries.
+///
+/// Accessing FFI extension options require a slightly different pattern
+/// than local extensions. The trait [`ExtensionOptionsFFIProvider`] can
+/// be used to simplify accessing FFI extensions.
+#[repr(C)]
+#[derive(Debug, Clone, StableAbi)]
+pub struct FFI_ConfigOptions {
+ base_options: RHashMap<RString, RString>,
+
+ extensions: FFI_ExtensionOptions,
+}
+
+impl From<&ConfigOptions> for FFI_ConfigOptions {
+ fn from(options: &ConfigOptions) -> Self {
+ let base_options: RHashMap<RString, RString> = options
+ .entries()
+ .into_iter()
+ .filter_map(|entry| entry.value.map(|value| (entry.key, value)))
+ .map(|(key, value)| (key.into(), value.into()))
+ .collect();
+
+ let mut extensions = FFI_ExtensionOptions::default();
+ for (extension_name, extension) in options.extensions.iter() {
+ for entry in extension.entries().iter() {
+ if let Some(value) = entry.value.as_ref() {
+ extensions
+ .set(format!("{extension_name}.{}",
entry.key).as_str(), value)
+ .expect("FFI_ExtensionOptions set should always return
Ok");
+ }
+ }
+ }
+
+ Self {
+ base_options,
+ extensions,
+ }
+ }
+}
+
+impl TryFrom<FFI_ConfigOptions> for ConfigOptions {
+ type Error = DataFusionError;
+ fn try_from(ffi_options: FFI_ConfigOptions) -> Result<Self, Self::Error> {
+ let mut options = ConfigOptions::default();
+ options.extensions.insert(ffi_options.extensions);
+
+ for kv_tuple in ffi_options.base_options.iter() {
+ options.set(kv_tuple.0.as_str(), kv_tuple.1.as_str())?;
+ }
+
+ Ok(options)
+ }
+}
+
+pub trait ExtensionOptionsFFIProvider {
+ /// Extract a [`ConfigExtension`]. This method should attempt to first
extract
+ /// the extension from the local options when possible. Should that fail,
it
+ /// should attempt to extract the FFI options and then convert them to the
+ /// desired [`ConfigExtension`].
+ fn local_or_ffi_extension<C: ConfigExtension + Clone + Default>(&self) ->
Option<C>;
+}
+
+impl ExtensionOptionsFFIProvider for ConfigOptions {
+ fn local_or_ffi_extension<C: ConfigExtension + Clone + Default>(&self) ->
Option<C> {
+ self.extensions
+ .get::<C>()
+ .map(|v| v.to_owned())
+ .or_else(|| {
+ self.extensions
+ .get::<FFI_ExtensionOptions>()
+ .and_then(|ffi_ext| ffi_ext.to_extension().ok())
+ })
+ }
+}
+
+impl ExtensionOptionsFFIProvider for TableOptions {
+ fn local_or_ffi_extension<C: ConfigExtension + Clone + Default>(&self) ->
Option<C> {
+ self.extensions
+ .get::<C>()
+ .map(|v| v.to_owned())
+ .or_else(|| {
+ self.extensions
+ .get::<FFI_ExtensionOptions>()
+ .and_then(|ffi_ext| ffi_ext.to_extension().ok())
+ })
+ }
+}
+
+/// A stable struct for sharing [`TableOptions`] across FFI boundaries.
+///
+/// Accessing FFI extension options require a slightly different pattern
+/// than local extensions. The trait [`ExtensionOptionsFFIProvider`] can
+/// be used to simplify accessing FFI extensions.
+#[repr(C)]
+#[derive(Debug, Clone, StableAbi)]
+pub struct FFI_TableOptions {
+ base_options: RHashMap<RString, RString>,
+
+ extensions: FFI_ExtensionOptions,
+}
+
+impl From<&TableOptions> for FFI_TableOptions {
+ fn from(options: &TableOptions) -> Self {
+ let base_options: RHashMap<RString, RString> = options
+ .entries()
+ .into_iter()
+ .filter_map(|entry| entry.value.map(|value| (entry.key, value)))
+ .map(|(key, value)| (key.into(), value.into()))
+ .collect();
+
+ let mut extensions = FFI_ExtensionOptions::default();
+ for (extension_name, extension) in options.extensions.iter() {
+ for entry in extension.entries().iter() {
+ if let Some(value) = entry.value.as_ref() {
+ extensions
+ .set(format!("{extension_name}.{}",
entry.key).as_str(), value)
+ .expect("FFI_ExtensionOptions set should always return
Ok");
+ }
+ }
+ }
+
+ Self {
+ base_options,
+ extensions,
+ }
+ }
+}
+
+impl TryFrom<FFI_TableOptions> for TableOptions {
+ type Error = DataFusionError;
+ fn try_from(ffi_options: FFI_TableOptions) -> Result<Self, Self::Error> {
+ let mut options = TableOptions::default();
+ options.extensions.insert(ffi_options.extensions);
+
+ for kv_tuple in ffi_options.base_options.iter() {
+ options.set(kv_tuple.0.as_str(), kv_tuple.1.as_str())?;
+ }
+
+ Ok(options)
+ }
+}
diff --git a/datafusion/ffi/src/lib.rs b/datafusion/ffi/src/lib.rs
index 5eb3626db1..d7410e8483 100644
--- a/datafusion/ffi/src/lib.rs
+++ b/datafusion/ffi/src/lib.rs
@@ -28,6 +28,7 @@
pub mod arrow_wrappers;
pub mod catalog_provider;
pub mod catalog_provider_list;
+pub mod config;
pub mod execution;
pub mod execution_plan;
pub mod expr;
diff --git a/datafusion/ffi/src/session/config.rs
b/datafusion/ffi/src/session/config.rs
index eb9c4e2c69..63f0f20ecc 100644
--- a/datafusion/ffi/src/session/config.rs
+++ b/datafusion/ffi/src/session/config.rs
@@ -15,11 +15,11 @@
// specific language governing permissions and limitations
// under the License.
-use std::collections::HashMap;
use std::ffi::c_void;
+use crate::config::FFI_ConfigOptions;
use abi_stable::StableAbi;
-use abi_stable::std_types::{RHashMap, RString};
+use datafusion_common::config::ConfigOptions;
use datafusion_common::error::{DataFusionError, Result};
use datafusion_execution::config::SessionConfig;
@@ -37,9 +37,8 @@ use datafusion_execution::config::SessionConfig;
#[repr(C)]
#[derive(Debug, StableAbi)]
pub struct FFI_SessionConfig {
- /// Return a hash map from key to value of the config options represented
- /// by string values.
- pub config_options: unsafe extern "C" fn(config: &Self) ->
RHashMap<RString, RString>,
+ /// FFI stable configuration options.
+ pub config_options: FFI_ConfigOptions,
/// Used to create a clone on the provider of the execution plan. This
should
/// only need to be called by the receiver of the plan.
@@ -67,21 +66,6 @@ impl FFI_SessionConfig {
}
}
-unsafe extern "C" fn config_options_fn_wrapper(
- config: &FFI_SessionConfig,
-) -> RHashMap<RString, RString> {
- let config_options = config.inner().options();
-
- let mut options = RHashMap::default();
- for config_entry in config_options.entries() {
- if let Some(value) = config_entry.value {
- options.insert(config_entry.key.into(), value.into());
- }
- }
-
- options
-}
-
unsafe extern "C" fn release_fn_wrapper(config: &mut FFI_SessionConfig) {
unsafe {
debug_assert!(!config.private_data.is_null());
@@ -100,7 +84,7 @@ unsafe extern "C" fn clone_fn_wrapper(config:
&FFI_SessionConfig) -> FFI_Session
let private_data = Box::new(SessionConfigPrivateData { config:
old_config });
FFI_SessionConfig {
- config_options: config_options_fn_wrapper,
+ config_options: config.config_options.clone(),
private_data: Box::into_raw(private_data) as *mut c_void,
clone: clone_fn_wrapper,
release: release_fn_wrapper,
@@ -119,8 +103,10 @@ impl From<&SessionConfig> for FFI_SessionConfig {
config: session.clone(),
});
+ let config_options =
FFI_ConfigOptions::from(session.options().as_ref());
+
Self {
- config_options: config_options_fn_wrapper,
+ config_options,
private_data: Box::into_raw(private_data) as *mut c_void,
clone: clone_fn_wrapper,
release: release_fn_wrapper,
@@ -149,14 +135,9 @@ impl TryFrom<&FFI_SessionConfig> for SessionConfig {
return Ok(config.inner().clone());
}
- let config_options = unsafe { (config.config_options)(config) };
-
- let mut options_map = HashMap::new();
- config_options.iter().for_each(|kv_pair| {
- options_map.insert(kv_pair.0.to_string(), kv_pair.1.to_string());
- });
+ let config_options =
ConfigOptions::try_from(config.config_options.clone())?;
- SessionConfig::from_string_hash_map(&options_map)
+ Ok(SessionConfig::from(config_options))
}
}
diff --git a/datafusion/ffi/src/tests/config.rs
b/datafusion/ffi/src/tests/config.rs
new file mode 100644
index 0000000000..46fc975620
--- /dev/null
+++ b/datafusion/ffi/src/tests/config.rs
@@ -0,0 +1,51 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use datafusion_common::config::ConfigExtension;
+use datafusion_common::extensions_options;
+
+use crate::config::extension_options::FFI_ExtensionOptions;
+
+extensions_options! {
+ pub struct ExternalConfig {
+ /// Should "foo" be replaced by "bar"?
+ pub is_enabled: bool, default = true
+
+ /// Some value to be extracted
+ pub base_number: usize, default = 1000
+ }
+}
+
+impl PartialEq for ExternalConfig {
+ fn eq(&self, other: &Self) -> bool {
+ self.base_number == other.base_number && self.is_enabled ==
other.is_enabled
+ }
+}
+impl Eq for ExternalConfig {}
+
+impl ConfigExtension for ExternalConfig {
+ const PREFIX: &'static str = "external_config";
+}
+
+pub(crate) extern "C" fn create_extension_options() -> FFI_ExtensionOptions {
+ let mut extensions = FFI_ExtensionOptions::default();
+ extensions
+ .add_config(&ExternalConfig::default())
+ .expect("add_config should be infallible for ExternalConfig");
+
+ extensions
+}
diff --git a/datafusion/ffi/src/tests/mod.rs b/datafusion/ffi/src/tests/mod.rs
index c936330668..cbee5febdb 100644
--- a/datafusion/ffi/src/tests/mod.rs
+++ b/datafusion/ffi/src/tests/mod.rs
@@ -36,6 +36,7 @@ use udf_udaf_udwf::{
use crate::catalog_provider::FFI_CatalogProvider;
use crate::catalog_provider_list::FFI_CatalogProviderList;
+use crate::config::extension_options::FFI_ExtensionOptions;
use crate::proto::logical_extension_codec::FFI_LogicalExtensionCodec;
use crate::table_provider::FFI_TableProvider;
use crate::table_provider_factory::FFI_TableProviderFactory;
@@ -47,6 +48,7 @@ use crate::udwf::FFI_WindowUDF;
mod async_provider;
pub mod catalog;
+pub mod config;
mod sync_provider;
mod table_provider_factory;
mod udf_udaf_udwf;
@@ -93,6 +95,9 @@ pub struct ForeignLibraryModule {
pub create_rank_udwf: extern "C" fn() -> FFI_WindowUDF,
+ /// Create extension options, for either ConfigOptions or TableOptions
+ pub create_extension_options: extern "C" fn() -> FFI_ExtensionOptions,
+
pub version: extern "C" fn() -> u64,
}
@@ -156,6 +161,7 @@ pub fn get_foreign_library_module() ->
ForeignLibraryModuleRef {
create_sum_udaf: create_ffi_sum_func,
create_stddev_udaf: create_ffi_stddev_func,
create_rank_udwf: create_ffi_rank_func,
+ create_extension_options: config::create_extension_options,
version: super::version,
}
.leak_into_prefix()
diff --git a/datafusion/ffi/tests/ffi_config.rs
b/datafusion/ffi/tests/ffi_config.rs
new file mode 100644
index 0000000000..ca0a3e31e8
--- /dev/null
+++ b/datafusion/ffi/tests/ffi_config.rs
@@ -0,0 +1,124 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+/// Add an additional module here for convenience to scope this to only
+/// when the feature integration-tests is built
+#[cfg(feature = "integration-tests")]
+mod tests {
+ use datafusion::error::{DataFusionError, Result};
+ use datafusion_common::ScalarValue;
+ use datafusion_common::config::{ConfigOptions, TableOptions};
+ use datafusion_execution::config::SessionConfig;
+ use datafusion_ffi::config::ExtensionOptionsFFIProvider;
+ use datafusion_ffi::tests::config::ExternalConfig;
+ use datafusion_ffi::tests::utils::get_module;
+
+ #[test]
+ fn test_ffi_config_options_extension() -> Result<()> {
+ let module = get_module()?;
+
+ let extension_options =
+ module
+ .create_extension_options()
+ .ok_or(DataFusionError::NotImplemented(
+ "External test library failed to implement
create_extension_options"
+ .to_string(),
+ ))?();
+
+ let mut config = ConfigOptions::new();
+ config.extensions.insert(extension_options);
+
+ // Verify default values are as expected
+ let returned_config: ExternalConfig = config
+ .local_or_ffi_extension()
+ .expect("should have external config extension");
+ assert_eq!(returned_config, ExternalConfig::default());
+
+ config.set("external_config.is_enabled", "false")?;
+ let returned_config: ExternalConfig = config
+ .local_or_ffi_extension()
+ .expect("should have external config extension");
+ assert!(!returned_config.is_enabled);
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_ffi_table_options_extension() -> Result<()> {
+ let module = get_module()?;
+
+ let extension_options =
+ module
+ .create_extension_options()
+ .ok_or(DataFusionError::NotImplemented(
+ "External test library failed to implement
create_extension_options"
+ .to_string(),
+ ))?();
+
+ let mut table_options = TableOptions::new();
+ table_options.extensions.insert(extension_options);
+
+ // Verify default values are as expected
+ let returned_options: ExternalConfig = table_options
+ .local_or_ffi_extension()
+ .expect("should have external config extension");
+
+ assert_eq!(returned_options, ExternalConfig::default());
+
+ table_options.set("external_config.is_enabled", "false")?;
+ let returned_options: ExternalConfig = table_options
+ .local_or_ffi_extension()
+ .expect("should have external config extension");
+ assert!(!returned_options.is_enabled);
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_ffi_session_config_options_extension() -> Result<()> {
+ let module = get_module()?;
+
+ let extension_options =
+ module
+ .create_extension_options()
+ .ok_or(DataFusionError::NotImplemented(
+ "External test library failed to implement
create_extension_options"
+ .to_string(),
+ ))?();
+
+ let mut config =
SessionConfig::new().with_option_extension(extension_options);
+
+ // Verify default values are as expected
+ let returned_config: ExternalConfig = config
+ .options()
+ .local_or_ffi_extension()
+ .expect("should have external config extension");
+ assert_eq!(returned_config, ExternalConfig::default());
+
+ config = config.set(
+ "external_config.is_enabled",
+ &ScalarValue::Boolean(Some(false)),
+ );
+ let returned_config: ExternalConfig = config
+ .options()
+ .local_or_ffi_extension()
+ .expect("should have external config extension");
+ assert!(!returned_config.is_enabled);
+
+ Ok(())
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]