This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch branch-51
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/branch-51 by this push:
new e571b49e09 [branch-51] Implement CatalogProviderList in FFI (#18912)
e571b49e09 is described below
commit e571b49e0983892597a8f92e5d1502b17a15b180
Author: Tim Saucer <[email protected]>
AuthorDate: Thu Dec 11 23:03:33 2025 +0100
[branch-51] Implement CatalogProviderList in FFI (#18912)
## Which issue does this PR close?
This is related to https://github.com/apache/datafusion/issues/18843
## Rationale for this change
This is a pure addition that users would like to use and it barely
missed the cutoff for 51.0.0
## What changes are included in this PR?
Adds in `FFI_CatalogProviderList`
## Are these changes tested?
Yes, and already in `main`
## Are there any user-facing changes?
No, pure addition
---
datafusion/ffi/src/catalog_provider.rs | 2 +-
datafusion/ffi/src/catalog_provider_list.rs | 283 +++++++++++++++++++++
datafusion/ffi/src/lib.rs | 1 +
datafusion/ffi/src/tests/catalog.rs | 57 ++++-
datafusion/ffi/src/tests/mod.rs | 6 +
.../tests/{ffi_integration.rs => ffi_catalog.rs} | 74 +++---
datafusion/ffi/tests/ffi_integration.rs | 27 --
7 files changed, 375 insertions(+), 75 deletions(-)
diff --git a/datafusion/ffi/src/catalog_provider.rs
b/datafusion/ffi/src/catalog_provider.rs
index 65dcab34f1..d279951783 100644
--- a/datafusion/ffi/src/catalog_provider.rs
+++ b/datafusion/ffi/src/catalog_provider.rs
@@ -204,7 +204,7 @@ impl FFI_CatalogProvider {
/// defined on this struct must only use the stable functions provided in
/// FFI_CatalogProvider to interact with the foreign table provider.
#[derive(Debug)]
-pub struct ForeignCatalogProvider(FFI_CatalogProvider);
+pub struct ForeignCatalogProvider(pub(crate) FFI_CatalogProvider);
unsafe impl Send for ForeignCatalogProvider {}
unsafe impl Sync for ForeignCatalogProvider {}
diff --git a/datafusion/ffi/src/catalog_provider_list.rs
b/datafusion/ffi/src/catalog_provider_list.rs
new file mode 100644
index 0000000000..b09f06d318
--- /dev/null
+++ b/datafusion/ffi/src/catalog_provider_list.rs
@@ -0,0 +1,283 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{any::Any, ffi::c_void, sync::Arc};
+
+use abi_stable::{
+ std_types::{ROption, RString, RVec},
+ StableAbi,
+};
+use datafusion::catalog::{CatalogProvider, CatalogProviderList};
+use tokio::runtime::Handle;
+
+use crate::catalog_provider::{FFI_CatalogProvider, ForeignCatalogProvider};
+
+/// A stable struct for sharing [`CatalogProviderList`] across FFI boundaries.
+#[repr(C)]
+#[derive(Debug, StableAbi)]
+#[allow(non_camel_case_types)]
+pub struct FFI_CatalogProviderList {
+ /// Register a catalog
+ pub register_catalog: unsafe extern "C" fn(
+ &Self,
+ name: RString,
+ catalog: &FFI_CatalogProvider,
+ ) -> ROption<FFI_CatalogProvider>,
+
+ /// List of existing catalogs
+ pub catalog_names: unsafe extern "C" fn(&Self) -> RVec<RString>,
+
+ /// Access a catalog
+ pub catalog:
+ unsafe extern "C" fn(&Self, name: RString) ->
ROption<FFI_CatalogProvider>,
+
+ /// Used to create a clone on the provider. This should only need to be
called
+ /// by the receiver of the plan.
+ pub clone: unsafe extern "C" fn(plan: &Self) -> Self,
+
+ /// Release the memory of the private data when it is no longer being used.
+ pub release: unsafe extern "C" fn(arg: &mut Self),
+
+ /// Return the major DataFusion version number of this provider.
+ pub version: unsafe extern "C" fn() -> u64,
+
+ /// Internal data. This is only to be accessed by the provider of the plan.
+ /// A [`ForeignCatalogProviderList`] should never attempt to access this
data.
+ pub private_data: *mut c_void,
+}
+
+unsafe impl Send for FFI_CatalogProviderList {}
+unsafe impl Sync for FFI_CatalogProviderList {}
+
+struct ProviderPrivateData {
+ provider: Arc<dyn CatalogProviderList + Send>,
+ runtime: Option<Handle>,
+}
+
+impl FFI_CatalogProviderList {
+ unsafe fn inner(&self) -> &Arc<dyn CatalogProviderList + Send> {
+ let private_data = self.private_data as *const ProviderPrivateData;
+ &(*private_data).provider
+ }
+
+ unsafe fn runtime(&self) -> Option<Handle> {
+ let private_data = self.private_data as *const ProviderPrivateData;
+ (*private_data).runtime.clone()
+ }
+}
+
+unsafe extern "C" fn catalog_names_fn_wrapper(
+ provider: &FFI_CatalogProviderList,
+) -> RVec<RString> {
+ let names = provider.inner().catalog_names();
+ names.into_iter().map(|s| s.into()).collect()
+}
+
+unsafe extern "C" fn register_catalog_fn_wrapper(
+ provider: &FFI_CatalogProviderList,
+ name: RString,
+ catalog: &FFI_CatalogProvider,
+) -> ROption<FFI_CatalogProvider> {
+ let runtime = provider.runtime();
+ let provider = provider.inner();
+ let catalog = Arc::new(ForeignCatalogProvider::from(catalog));
+
+ provider
+ .register_catalog(name.into(), catalog)
+ .map(|catalog| FFI_CatalogProvider::new(catalog, runtime))
+ .into()
+}
+
+unsafe extern "C" fn catalog_fn_wrapper(
+ provider: &FFI_CatalogProviderList,
+ name: RString,
+) -> ROption<FFI_CatalogProvider> {
+ let runtime = provider.runtime();
+ let provider = provider.inner();
+ provider
+ .catalog(name.as_str())
+ .map(|catalog| FFI_CatalogProvider::new(catalog, runtime))
+ .into()
+}
+
+unsafe extern "C" fn release_fn_wrapper(provider: &mut
FFI_CatalogProviderList) {
+ let private_data = Box::from_raw(provider.private_data as *mut
ProviderPrivateData);
+ drop(private_data);
+}
+
+unsafe extern "C" fn clone_fn_wrapper(
+ provider: &FFI_CatalogProviderList,
+) -> FFI_CatalogProviderList {
+ let old_private_data = provider.private_data as *const ProviderPrivateData;
+ let runtime = (*old_private_data).runtime.clone();
+
+ let private_data = Box::into_raw(Box::new(ProviderPrivateData {
+ provider: Arc::clone(&(*old_private_data).provider),
+ runtime,
+ })) as *mut c_void;
+
+ FFI_CatalogProviderList {
+ register_catalog: register_catalog_fn_wrapper,
+ catalog_names: catalog_names_fn_wrapper,
+ catalog: catalog_fn_wrapper,
+ clone: clone_fn_wrapper,
+ release: release_fn_wrapper,
+ version: super::version,
+ private_data,
+ }
+}
+
+impl Drop for FFI_CatalogProviderList {
+ fn drop(&mut self) {
+ unsafe { (self.release)(self) }
+ }
+}
+
+impl FFI_CatalogProviderList {
+ /// Creates a new [`FFI_CatalogProviderList`].
+ pub fn new(
+ provider: Arc<dyn CatalogProviderList + Send>,
+ runtime: Option<Handle>,
+ ) -> Self {
+ let private_data = Box::new(ProviderPrivateData { provider, runtime });
+
+ Self {
+ register_catalog: register_catalog_fn_wrapper,
+ catalog_names: catalog_names_fn_wrapper,
+ catalog: catalog_fn_wrapper,
+ clone: clone_fn_wrapper,
+ release: release_fn_wrapper,
+ version: super::version,
+ private_data: Box::into_raw(private_data) as *mut c_void,
+ }
+ }
+}
+
+/// This wrapper struct exists on the receiver side of the FFI interface, so
it has
+/// no guarantees about being able to access the data in `private_data`. Any
functions
+/// defined on this struct must only use the stable functions provided in
+/// FFI_CatalogProviderList to interact with the foreign catalog provider list.
+#[derive(Debug)]
+pub struct ForeignCatalogProviderList(FFI_CatalogProviderList);
+
+unsafe impl Send for ForeignCatalogProviderList {}
+unsafe impl Sync for ForeignCatalogProviderList {}
+
+impl From<&FFI_CatalogProviderList> for ForeignCatalogProviderList {
+ fn from(provider: &FFI_CatalogProviderList) -> Self {
+ Self(provider.clone())
+ }
+}
+
+impl Clone for FFI_CatalogProviderList {
+ fn clone(&self) -> Self {
+ unsafe { (self.clone)(self) }
+ }
+}
+
+impl CatalogProviderList for ForeignCatalogProviderList {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn register_catalog(
+ &self,
+ name: String,
+ catalog: Arc<dyn CatalogProvider>,
+ ) -> Option<Arc<dyn CatalogProvider>> {
+ unsafe {
+ let catalog = match
catalog.as_any().downcast_ref::<ForeignCatalogProvider>()
+ {
+ Some(s) => &s.0,
+ None => &FFI_CatalogProvider::new(catalog, None),
+ };
+
+ (self.0.register_catalog)(&self.0, name.into(), catalog)
+ .map(|s| Arc::new(ForeignCatalogProvider(s)) as Arc<dyn
CatalogProvider>)
+ .into()
+ }
+ }
+
+ fn catalog_names(&self) -> Vec<String> {
+ unsafe {
+ (self.0.catalog_names)(&self.0)
+ .into_iter()
+ .map(Into::into)
+ .collect()
+ }
+ }
+
+ fn catalog(&self, name: &str) -> Option<Arc<dyn CatalogProvider>> {
+ unsafe {
+ (self.0.catalog)(&self.0, name.into())
+ .map(|catalog| {
+ Arc::new(ForeignCatalogProvider(catalog)) as Arc<dyn
CatalogProvider>
+ })
+ .into()
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use datafusion::catalog::{MemoryCatalogProvider,
MemoryCatalogProviderList};
+
+ use super::*;
+
+ #[test]
+ fn test_round_trip_ffi_catalog_provider_list() {
+ let prior_catalog = Arc::new(MemoryCatalogProvider::new());
+
+ let catalog_list = Arc::new(MemoryCatalogProviderList::new());
+ assert!(catalog_list
+ .as_ref()
+ .register_catalog("prior_catalog".to_owned(), prior_catalog)
+ .is_none());
+
+ let ffi_catalog_list = FFI_CatalogProviderList::new(catalog_list,
None);
+
+ let foreign_catalog_list: ForeignCatalogProviderList =
(&ffi_catalog_list).into();
+
+ let prior_catalog_names = foreign_catalog_list.catalog_names();
+ assert_eq!(prior_catalog_names.len(), 1);
+ assert_eq!(prior_catalog_names[0], "prior_catalog");
+
+ // Replace an existing catalog with one of the same name
+ let returned_catalog = foreign_catalog_list.register_catalog(
+ "prior_catalog".to_owned(),
+ Arc::new(MemoryCatalogProvider::new()),
+ );
+ assert!(returned_catalog.is_some());
+ assert_eq!(foreign_catalog_list.catalog_names().len(), 1);
+
+ // Add a new catalog
+ let returned_catalog = foreign_catalog_list.register_catalog(
+ "second_catalog".to_owned(),
+ Arc::new(MemoryCatalogProvider::new()),
+ );
+ assert!(returned_catalog.is_none());
+ assert_eq!(foreign_catalog_list.catalog_names().len(), 2);
+
+ // Retrieve non-existent catalog
+ let returned_catalog =
foreign_catalog_list.catalog("non_existent_catalog");
+ assert!(returned_catalog.is_none());
+
+ // Retrieve valid catalog
+ let returned_catalog = foreign_catalog_list.catalog("second_catalog");
+ assert!(returned_catalog.is_some());
+ }
+}
diff --git a/datafusion/ffi/src/lib.rs b/datafusion/ffi/src/lib.rs
index 0c2340e8ce..a809405777 100644
--- a/datafusion/ffi/src/lib.rs
+++ b/datafusion/ffi/src/lib.rs
@@ -26,6 +26,7 @@
pub mod arrow_wrappers;
pub mod catalog_provider;
+pub mod catalog_provider_list;
pub mod execution_plan;
pub mod insert_op;
pub mod plan_properties;
diff --git a/datafusion/ffi/src/tests/catalog.rs
b/datafusion/ffi/src/tests/catalog.rs
index f4293adb41..b6efbdf726 100644
--- a/datafusion/ffi/src/tests/catalog.rs
+++ b/datafusion/ffi/src/tests/catalog.rs
@@ -28,12 +28,13 @@
use std::{any::Any, fmt::Debug, sync::Arc};
use crate::catalog_provider::FFI_CatalogProvider;
+use crate::catalog_provider_list::FFI_CatalogProviderList;
use arrow::datatypes::Schema;
use async_trait::async_trait;
use datafusion::{
catalog::{
- CatalogProvider, MemoryCatalogProvider, MemorySchemaProvider,
SchemaProvider,
- TableProvider,
+ CatalogProvider, CatalogProviderList, MemoryCatalogProvider,
+ MemoryCatalogProviderList, MemorySchemaProvider, SchemaProvider,
TableProvider,
},
common::exec_err,
datasource::MemTable,
@@ -181,3 +182,55 @@ pub(crate) extern "C" fn create_catalog_provider() ->
FFI_CatalogProvider {
let catalog_provider = Arc::new(FixedCatalogProvider::default());
FFI_CatalogProvider::new(catalog_provider, None)
}
+
+/// This catalog provider list is intended only for unit tests. It
prepopulates with one
+/// catalog and only allows for catalogs named after four colors.
+#[derive(Debug)]
+pub struct FixedCatalogProviderList {
+ inner: MemoryCatalogProviderList,
+}
+
+impl Default for FixedCatalogProviderList {
+ fn default() -> Self {
+ let inner = MemoryCatalogProviderList::new();
+
+ let _ = inner.register_catalog(
+ "blue".to_owned(),
+ Arc::new(FixedCatalogProvider::default()),
+ );
+
+ Self { inner }
+ }
+}
+
+impl CatalogProviderList for FixedCatalogProviderList {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn catalog_names(&self) -> Vec<String> {
+ self.inner.catalog_names()
+ }
+
+ fn catalog(&self, name: &str) -> Option<Arc<dyn CatalogProvider>> {
+ self.inner.catalog(name)
+ }
+
+ fn register_catalog(
+ &self,
+ name: String,
+ catalog: Arc<dyn CatalogProvider>,
+ ) -> Option<Arc<dyn CatalogProvider>> {
+ if !["blue", "red", "green", "yellow"].contains(&name.as_str()) {
+ log::warn!("FixedCatalogProviderList only provides four catalogs:
blue, red, green, yellow");
+ return None;
+ }
+
+ self.inner.register_catalog(name, catalog)
+ }
+}
+
+pub(crate) extern "C" fn create_catalog_provider_list() ->
FFI_CatalogProviderList {
+ let catalog_provider_list = Arc::new(FixedCatalogProviderList::default());
+ FFI_CatalogProviderList::new(catalog_provider_list, None)
+}
diff --git a/datafusion/ffi/src/tests/mod.rs b/datafusion/ffi/src/tests/mod.rs
index 816086c320..d9b4a61579 100644
--- a/datafusion/ffi/src/tests/mod.rs
+++ b/datafusion/ffi/src/tests/mod.rs
@@ -34,6 +34,8 @@ use crate::udaf::FFI_AggregateUDF;
use crate::udwf::FFI_WindowUDF;
use super::{table_provider::FFI_TableProvider, udf::FFI_ScalarUDF};
+use crate::catalog_provider_list::FFI_CatalogProviderList;
+use crate::tests::catalog::create_catalog_provider_list;
use arrow::array::RecordBatch;
use async_provider::create_async_table_provider;
use datafusion::{
@@ -62,6 +64,9 @@ pub struct ForeignLibraryModule {
/// Construct an opinionated catalog provider
pub create_catalog: extern "C" fn() -> FFI_CatalogProvider,
+ /// Construct an opinionated catalog provider list
+ pub create_catalog_list: extern "C" fn() -> FFI_CatalogProviderList,
+
/// Constructs the table provider
pub create_table: extern "C" fn(synchronous: bool) -> FFI_TableProvider,
@@ -123,6 +128,7 @@ extern "C" fn construct_table_provider(synchronous: bool)
-> FFI_TableProvider {
pub fn get_foreign_library_module() -> ForeignLibraryModuleRef {
ForeignLibraryModule {
create_catalog: create_catalog_provider,
+ create_catalog_list: create_catalog_provider_list,
create_table: construct_table_provider,
create_scalar_udf: create_ffi_abs_func,
create_nullary_udf: create_ffi_random_func,
diff --git a/datafusion/ffi/tests/ffi_integration.rs
b/datafusion/ffi/tests/ffi_catalog.rs
similarity index 50%
copy from datafusion/ffi/tests/ffi_integration.rs
copy to datafusion/ffi/tests/ffi_catalog.rs
index eb53e76bfb..b63d8cbd63 100644
--- a/datafusion/ffi/tests/ffi_integration.rs
+++ b/datafusion/ffi/tests/ffi_catalog.rs
@@ -19,79 +19,63 @@
/// when the feature integration-tests is built
#[cfg(feature = "integration-tests")]
mod tests {
- use datafusion::error::{DataFusionError, Result};
use datafusion::prelude::SessionContext;
+ use datafusion_common::DataFusionError;
use datafusion_ffi::catalog_provider::ForeignCatalogProvider;
- use datafusion_ffi::table_provider::ForeignTableProvider;
- use datafusion_ffi::tests::create_record_batch;
+ use datafusion_ffi::catalog_provider_list::ForeignCatalogProviderList;
use datafusion_ffi::tests::utils::get_module;
use std::sync::Arc;
- /// It is important that this test is in the `tests` directory and not in
the
- /// library directory so we can verify we are building a dynamic library
and
- /// testing it via a different executable.
- async fn test_table_provider(synchronous: bool) -> Result<()> {
- let table_provider_module = get_module()?;
+ #[tokio::test]
+ async fn test_catalog() -> datafusion_common::Result<()> {
+ let module = get_module()?;
- // By calling the code below, the table provided will be created within
- // the module's code.
- let ffi_table_provider = table_provider_module.create_table().ok_or(
- DataFusionError::NotImplemented(
- "External table provider failed to implement
create_table".to_string(),
- ),
- )?(synchronous);
+ let ffi_catalog =
+ module
+ .create_catalog()
+ .ok_or(DataFusionError::NotImplemented(
+ "External catalog provider failed to implement
create_catalog"
+ .to_string(),
+ ))?();
+ let foreign_catalog: ForeignCatalogProvider = (&ffi_catalog).into();
- // In order to access the table provider within this executable, we
need to
- // turn it into a `ForeignTableProvider`.
- let foreign_table_provider: ForeignTableProvider =
(&ffi_table_provider).into();
+ let ctx = SessionContext::default();
+ let _ = ctx.register_catalog("fruit", Arc::new(foreign_catalog));
- let ctx = SessionContext::new();
+ let df = ctx.table("fruit.apple.purchases").await?;
- // Display the data to show the full cycle works.
- ctx.register_table("external_table",
Arc::new(foreign_table_provider))?;
- let df = ctx.table("external_table").await?;
let results = df.collect().await?;
- assert_eq!(results.len(), 3);
- assert_eq!(results[0], create_record_batch(1, 5));
- assert_eq!(results[1], create_record_batch(6, 1));
- assert_eq!(results[2], create_record_batch(7, 5));
+ assert_eq!(results.len(), 2);
+ let num_rows: usize = results.into_iter().map(|rb|
rb.num_rows()).sum();
+ assert_eq!(num_rows, 5);
Ok(())
}
#[tokio::test]
- async fn async_test_table_provider() -> Result<()> {
- test_table_provider(false).await
- }
-
- #[tokio::test]
- async fn sync_test_table_provider() -> Result<()> {
- test_table_provider(true).await
- }
-
- #[tokio::test]
- async fn test_catalog() -> Result<()> {
+ async fn test_catalog_list() -> datafusion_common::Result<()> {
let module = get_module()?;
- let ffi_catalog =
+ let ffi_catalog_list =
module
- .create_catalog()
+ .create_catalog_list()
.ok_or(DataFusionError::NotImplemented(
- "External catalog provider failed to implement
create_catalog"
+ "External catalog provider failed to implement
create_catalog_list"
.to_string(),
))?();
- let foreign_catalog: ForeignCatalogProvider = (&ffi_catalog).into();
+ let foreign_catalog_list: ForeignCatalogProviderList =
(&ffi_catalog_list).into();
let ctx = SessionContext::default();
- let _ = ctx.register_catalog("fruit", Arc::new(foreign_catalog));
+ ctx.register_catalog_list(Arc::new(foreign_catalog_list));
- let df = ctx.table("fruit.apple.purchases").await?;
+ let df = ctx.table("blue.apple.purchases").await?;
let results = df.collect().await?;
- assert!(!results.is_empty());
- assert!(results[0].num_rows() != 0);
+ assert_eq!(results.len(), 2);
+ let num_rows: usize = results.into_iter().map(|rb|
rb.num_rows()).sum();
+ assert_eq!(num_rows, 5);
Ok(())
}
diff --git a/datafusion/ffi/tests/ffi_integration.rs
b/datafusion/ffi/tests/ffi_integration.rs
index eb53e76bfb..7b4d1b1e35 100644
--- a/datafusion/ffi/tests/ffi_integration.rs
+++ b/datafusion/ffi/tests/ffi_integration.rs
@@ -21,7 +21,6 @@
mod tests {
use datafusion::error::{DataFusionError, Result};
use datafusion::prelude::SessionContext;
- use datafusion_ffi::catalog_provider::ForeignCatalogProvider;
use datafusion_ffi::table_provider::ForeignTableProvider;
use datafusion_ffi::tests::create_record_batch;
use datafusion_ffi::tests::utils::get_module;
@@ -69,30 +68,4 @@ mod tests {
async fn sync_test_table_provider() -> Result<()> {
test_table_provider(true).await
}
-
- #[tokio::test]
- async fn test_catalog() -> Result<()> {
- let module = get_module()?;
-
- let ffi_catalog =
- module
- .create_catalog()
- .ok_or(DataFusionError::NotImplemented(
- "External catalog provider failed to implement
create_catalog"
- .to_string(),
- ))?();
- let foreign_catalog: ForeignCatalogProvider = (&ffi_catalog).into();
-
- let ctx = SessionContext::default();
- let _ = ctx.register_catalog("fruit", Arc::new(foreign_catalog));
-
- let df = ctx.table("fruit.apple.purchases").await?;
-
- let results = df.collect().await?;
-
- assert!(!results.is_empty());
- assert!(results[0].num_rows() != 0);
-
- Ok(())
- }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]