This is an automated email from the ASF dual-hosted git repository.
liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git
The following commit(s) were added to refs/heads/main by this push:
new bf984c754 feat(datafusion): Split IcebergTableProvider into static and
non-static table provider (#1879)
bf984c754 is described below
commit bf984c754630cd4b536853c81528b9b69a3dab4d
Author: Shawn Chang <[email protected]>
AuthorDate: Thu Nov 27 04:50:40 2025 -0800
feat(datafusion): Split IcebergTableProvider into static and non-static
table provider (#1879)
---
bindings/python/Cargo.lock | 1 +
bindings/python/src/datafusion_table_provider.rs | 6 +-
.../tests/shared_tests/datafusion.rs | 4 +-
crates/integrations/datafusion/src/schema.rs | 6 +-
crates/integrations/datafusion/src/table/mod.rs | 468 +++++++++++++++++----
.../datafusion/src/table/table_provider_factory.rs | 8 +-
.../tests/integration_datafusion_test.rs | 12 -
7 files changed, 390 insertions(+), 115 deletions(-)
diff --git a/bindings/python/Cargo.lock b/bindings/python/Cargo.lock
index 8249414b8..814c9afb3 100644
--- a/bindings/python/Cargo.lock
+++ b/bindings/python/Cargo.lock
@@ -2313,6 +2313,7 @@ dependencies = [
"chrono",
"derive_builder",
"expect-test",
+ "flate2",
"fnv",
"futures",
"itertools 0.13.0",
diff --git a/bindings/python/src/datafusion_table_provider.rs
b/bindings/python/src/datafusion_table_provider.rs
index b5e1bf952..8db7223b3 100644
--- a/bindings/python/src/datafusion_table_provider.rs
+++ b/bindings/python/src/datafusion_table_provider.rs
@@ -23,7 +23,7 @@ use datafusion_ffi::table_provider::FFI_TableProvider;
use iceberg::TableIdent;
use iceberg::io::FileIO;
use iceberg::table::StaticTable;
-use iceberg_datafusion::table::IcebergTableProvider;
+use iceberg_datafusion::table::IcebergStaticTableProvider;
use pyo3::exceptions::PyRuntimeError;
use pyo3::prelude::*;
use pyo3::types::PyCapsule;
@@ -32,7 +32,7 @@ use crate::runtime::runtime;
#[pyclass(name = "IcebergDataFusionTable")]
pub struct PyIcebergDataFusionTable {
- inner: Arc<IcebergTableProvider>,
+ inner: Arc<IcebergStaticTableProvider>,
}
#[pymethods]
@@ -69,7 +69,7 @@ impl PyIcebergDataFusionTable {
let table = static_table.into_table();
- IcebergTableProvider::try_new_from_table(table)
+ IcebergStaticTableProvider::try_new_from_table(table)
.await
.map_err(|e| {
PyRuntimeError::new_err(format!("Failed to create table
provider: {e}"))
diff --git a/crates/integration_tests/tests/shared_tests/datafusion.rs
b/crates/integration_tests/tests/shared_tests/datafusion.rs
index 81bbb5f54..60dd9f36c 100644
--- a/crates/integration_tests/tests/shared_tests/datafusion.rs
+++ b/crates/integration_tests/tests/shared_tests/datafusion.rs
@@ -26,7 +26,7 @@ use datafusion::error::DataFusionError;
use datafusion::prelude::SessionContext;
use iceberg::{Catalog, CatalogBuilder, TableIdent};
use iceberg_catalog_rest::RestCatalogBuilder;
-use iceberg_datafusion::IcebergTableProvider;
+use iceberg_datafusion::IcebergStaticTableProvider;
use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
use crate::get_shared_containers;
@@ -47,7 +47,7 @@ async fn test_basic_queries() -> Result<(), DataFusionError> {
let ctx = SessionContext::new();
let table_provider = Arc::new(
- IcebergTableProvider::try_new_from_table(table)
+ IcebergStaticTableProvider::try_new_from_table(table)
.await
.unwrap(),
);
diff --git a/crates/integrations/datafusion/src/schema.rs
b/crates/integrations/datafusion/src/schema.rs
index 3920ee73c..31bbdbd67 100644
--- a/crates/integrations/datafusion/src/schema.rs
+++ b/crates/integrations/datafusion/src/schema.rs
@@ -28,6 +28,7 @@ use iceberg::inspect::MetadataTableType;
use iceberg::{Catalog, NamespaceIdent, Result};
use crate::table::IcebergTableProvider;
+use crate::to_datafusion_error;
/// Represents a [`SchemaProvider`] for the Iceberg [`Catalog`], managing
/// access to table providers within a specific namespace.
@@ -113,7 +114,10 @@ impl SchemaProvider for IcebergSchemaProvider {
let metadata_table_type =
MetadataTableType::try_from(metadata_table_name).map_err(DataFusionError::Plan)?;
if let Some(table) = self.tables.get(table_name) {
- let metadata_table = table.metadata_table(metadata_table_type);
+ let metadata_table = table
+ .metadata_table(metadata_table_type)
+ .await
+ .map_err(to_datafusion_error)?;
return Ok(Some(Arc::new(metadata_table)));
} else {
return Ok(None);
diff --git a/crates/integrations/datafusion/src/table/mod.rs
b/crates/integrations/datafusion/src/table/mod.rs
index 42a3baad3..8527668d6 100644
--- a/crates/integrations/datafusion/src/table/mod.rs
+++ b/crates/integrations/datafusion/src/table/mod.rs
@@ -15,6 +15,16 @@
// specific language governing permissions and limitations
// under the License.
+//! Iceberg table providers for DataFusion.
+//!
+//! This module provides two table provider implementations:
+//!
+//! - [`IcebergTableProvider`]: Catalog-backed provider with automatic
metadata refresh.
+//! Use for write operations and when you need to see the latest table state.
+//!
+//! - [`IcebergStaticTableProvider`]: Static provider for read-only access to
a specific
+//! table snapshot. Use for consistent analytical queries or time-travel
scenarios.
+
pub mod metadata_table;
pub mod table_provider_factory;
@@ -38,98 +48,61 @@ use iceberg::table::Table;
use iceberg::{Catalog, Error, ErrorKind, NamespaceIdent, Result, TableIdent};
use metadata_table::IcebergMetadataTableProvider;
+use crate::error::to_datafusion_error;
use crate::physical_plan::commit::IcebergCommitExec;
use crate::physical_plan::project::project_with_partition;
use crate::physical_plan::repartition::repartition;
use crate::physical_plan::scan::IcebergTableScan;
use crate::physical_plan::write::IcebergWriteExec;
-/// Represents a [`TableProvider`] for the Iceberg [`Catalog`],
-/// managing access to a [`Table`].
+/// Catalog-backed table provider with automatic metadata refresh.
+///
+/// This provider loads fresh table metadata from the catalog on every scan
and write
+/// operation, ensuring you always see the latest table state. Use this when
you need
+/// write operations or want to see the most up-to-date data.
+///
+/// For read-only access to a specific snapshot without catalog overhead, use
+/// [`IcebergStaticTableProvider`] instead.
#[derive(Debug, Clone)]
pub struct IcebergTableProvider {
- /// A table in the catalog.
- table: Table,
- /// Table snapshot id that will be queried via this provider.
- snapshot_id: Option<i64>,
- /// A reference-counted arrow `Schema`.
+ /// The catalog that manages this table
+ catalog: Arc<dyn Catalog>,
+ /// The table identifier (namespace + name)
+ table_ident: TableIdent,
+ /// A reference-counted arrow `Schema` (cached at construction)
schema: ArrowSchemaRef,
- /// The catalog that the table belongs to.
- catalog: Option<Arc<dyn Catalog>>,
}
impl IcebergTableProvider {
- pub(crate) fn new(table: Table, schema: ArrowSchemaRef) -> Self {
- IcebergTableProvider {
- table,
- snapshot_id: None,
- schema,
- catalog: None,
- }
- }
- /// Asynchronously tries to construct a new [`IcebergTableProvider`]
- /// using the given client and table name to fetch an actual [`Table`]
- /// in the provided namespace.
+ /// Creates a new catalog-backed table provider.
+ ///
+ /// Loads the table once to get the initial schema, then stores the catalog
+ /// reference for future metadata refreshes on each operation.
pub(crate) async fn try_new(
- client: Arc<dyn Catalog>,
+ catalog: Arc<dyn Catalog>,
namespace: NamespaceIdent,
name: impl Into<String>,
) -> Result<Self> {
- let ident = TableIdent::new(namespace, name.into());
- let table = client.load_table(&ident).await?;
+ let table_ident = TableIdent::new(namespace, name.into());
+ // Load table once to get initial schema
+ let table = catalog.load_table(&table_ident).await?;
let schema =
Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?);
Ok(IcebergTableProvider {
- table,
- snapshot_id: None,
- schema,
- catalog: Some(client),
- })
- }
-
- /// Asynchronously tries to construct a new [`IcebergTableProvider`]
- /// using the given table. Can be used to create a table provider from an
existing table regardless of the catalog implementation.
- pub async fn try_new_from_table(table: Table) -> Result<Self> {
- let schema =
Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?);
- Ok(IcebergTableProvider {
- table,
- snapshot_id: None,
- schema,
- catalog: None,
- })
- }
-
- /// Asynchronously tries to construct a new [`IcebergTableProvider`]
- /// using a specific snapshot of the given table. Can be used to create a
table provider from an existing table regardless of the catalog implementation.
- pub async fn try_new_from_table_snapshot(table: Table, snapshot_id: i64)
-> Result<Self> {
- let snapshot = table
- .metadata()
- .snapshot_by_id(snapshot_id)
- .ok_or_else(|| {
- Error::new(
- ErrorKind::Unexpected,
- format!(
- "snapshot id {snapshot_id} not found in table {}",
- table.identifier().name()
- ),
- )
- })?;
- let schema = snapshot.schema(table.metadata())?;
- let schema = Arc::new(schema_to_arrow_schema(&schema)?);
- Ok(IcebergTableProvider {
- table,
- snapshot_id: Some(snapshot_id),
+ catalog,
+ table_ident,
schema,
- catalog: None,
})
}
- pub(crate) fn metadata_table(&self, r#type: MetadataTableType) ->
IcebergMetadataTableProvider {
- IcebergMetadataTableProvider {
- table: self.table.clone(),
- r#type,
- }
+ pub(crate) async fn metadata_table(
+ &self,
+ r#type: MetadataTableType,
+ ) -> Result<IcebergMetadataTableProvider> {
+ // Load fresh table metadata for metadata table access
+ let table = self.catalog.load_table(&self.table_ident).await?;
+ Ok(IcebergMetadataTableProvider { table, r#type })
}
}
@@ -154,9 +127,17 @@ impl TableProvider for IcebergTableProvider {
filters: &[Expr],
_limit: Option<usize>,
) -> DFResult<Arc<dyn ExecutionPlan>> {
+ // Load fresh table metadata from catalog
+ let table = self
+ .catalog
+ .load_table(&self.table_ident)
+ .await
+ .map_err(to_datafusion_error)?;
+
+ // Create scan with fresh metadata (always use current snapshot)
Ok(Arc::new(IcebergTableScan::new(
- self.table.clone(),
- self.snapshot_id,
+ table,
+ None, // Always use current snapshot for catalog-backed provider
self.schema.clone(),
projection,
filters,
@@ -177,17 +158,18 @@ impl TableProvider for IcebergTableProvider {
input: Arc<dyn ExecutionPlan>,
_insert_op: InsertOp,
) -> DFResult<Arc<dyn ExecutionPlan>> {
- let Some(catalog) = self.catalog.clone() else {
- return Err(DataFusionError::Execution(
- "Catalog cannot be none for insert_into".to_string(),
- ));
- };
+ // Load fresh table metadata from catalog
+ let table = self
+ .catalog
+ .load_table(&self.table_ident)
+ .await
+ .map_err(to_datafusion_error)?;
- let partition_spec = self.table.metadata().default_partition_spec();
+ let partition_spec = table.metadata().default_partition_spec();
// Step 1: Project partition values for partitioned tables
let plan_with_partition = if !partition_spec.is_unpartitioned() {
- project_with_partition(input, &self.table)?
+ project_with_partition(input, &table)?
} else {
input
};
@@ -200,14 +182,11 @@ impl TableProvider for IcebergTableProvider {
)
})?;
- let repartitioned_plan = repartition(
- plan_with_partition,
- self.table.metadata_ref(),
- target_partitions,
- )?;
+ let repartitioned_plan =
+ repartition(plan_with_partition, table.metadata_ref(),
target_partitions)?;
let write_plan = Arc::new(IcebergWriteExec::new(
- self.table.clone(),
+ table.clone(),
repartitioned_plan,
self.schema.clone(),
));
@@ -216,21 +195,139 @@ impl TableProvider for IcebergTableProvider {
let coalesce_partitions =
Arc::new(CoalescePartitionsExec::new(write_plan));
Ok(Arc::new(IcebergCommitExec::new(
- self.table.clone(),
- catalog,
+ table,
+ self.catalog.clone(),
coalesce_partitions,
self.schema.clone(),
)))
}
}
+/// Static table provider for read-only snapshot access.
+///
+/// This provider holds a cached table instance and does not refresh metadata
or support
+/// write operations. Use this for consistent analytical queries, time-travel
scenarios,
+/// or when you want to avoid catalog overhead.
+///
+/// For catalog-backed tables with write support and automatic refresh, use
+/// [`IcebergTableProvider`] instead.
+#[derive(Debug, Clone)]
+pub struct IcebergStaticTableProvider {
+ /// The static table instance (never refreshed)
+ table: Table,
+ /// Optional snapshot ID for this static view
+ snapshot_id: Option<i64>,
+ /// A reference-counted arrow `Schema`
+ schema: ArrowSchemaRef,
+}
+
+impl IcebergStaticTableProvider {
+ /// Creates a static provider from a table instance.
+ ///
+ /// Uses the table's current snapshot for all queries. Does not support
write operations.
+ pub async fn try_new_from_table(table: Table) -> Result<Self> {
+ let schema =
Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?);
+ Ok(IcebergStaticTableProvider {
+ table,
+ snapshot_id: None,
+ schema,
+ })
+ }
+
+ /// Creates a static provider for a specific table snapshot.
+ ///
+ /// Queries the specified snapshot for all operations. Useful for
time-travel queries.
+ /// Does not support write operations.
+ pub async fn try_new_from_table_snapshot(table: Table, snapshot_id: i64)
-> Result<Self> {
+ let snapshot = table
+ .metadata()
+ .snapshot_by_id(snapshot_id)
+ .ok_or_else(|| {
+ Error::new(
+ ErrorKind::Unexpected,
+ format!(
+ "snapshot id {snapshot_id} not found in table {}",
+ table.identifier().name()
+ ),
+ )
+ })?;
+ let table_schema = snapshot.schema(table.metadata())?;
+ let schema = Arc::new(schema_to_arrow_schema(&table_schema)?);
+ Ok(IcebergStaticTableProvider {
+ table,
+ snapshot_id: Some(snapshot_id),
+ schema,
+ })
+ }
+}
+
+#[async_trait]
+impl TableProvider for IcebergStaticTableProvider {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn schema(&self) -> ArrowSchemaRef {
+ self.schema.clone()
+ }
+
+ fn table_type(&self) -> TableType {
+ TableType::Base
+ }
+
+ async fn scan(
+ &self,
+ _state: &dyn Session,
+ projection: Option<&Vec<usize>>,
+ filters: &[Expr],
+ _limit: Option<usize>,
+ ) -> DFResult<Arc<dyn ExecutionPlan>> {
+ // Use cached table (no refresh)
+ Ok(Arc::new(IcebergTableScan::new(
+ self.table.clone(),
+ self.snapshot_id,
+ self.schema.clone(),
+ projection,
+ filters,
+ )))
+ }
+
+ fn supports_filters_pushdown(
+ &self,
+ filters: &[&Expr],
+ ) -> DFResult<Vec<TableProviderFilterPushDown>> {
+ // Push down all filters, as a single source of truth, the scanner
will drop the filters which couldn't be push down
+ Ok(vec![TableProviderFilterPushDown::Inexact; filters.len()])
+ }
+
+ async fn insert_into(
+ &self,
+ _state: &dyn Session,
+ _input: Arc<dyn ExecutionPlan>,
+ _insert_op: InsertOp,
+ ) -> DFResult<Arc<dyn ExecutionPlan>> {
+ Err(to_datafusion_error(Error::new(
+ ErrorKind::FeatureUnsupported,
+ "Write operations are not supported on IcebergStaticTableProvider.
\
+ Use IcebergTableProvider with a catalog for write support."
+ .to_string(),
+ )))
+ }
+}
+
#[cfg(test)]
mod tests {
+ use std::collections::HashMap;
+ use std::sync::Arc;
+
use datafusion::common::Column;
use datafusion::prelude::SessionContext;
- use iceberg::TableIdent;
use iceberg::io::FileIO;
+ use iceberg::memory::{MEMORY_CATALOG_WAREHOUSE, MemoryCatalogBuilder};
+ use iceberg::spec::{NestedField, PrimitiveType, Schema, Type};
use iceberg::table::{StaticTable, Table};
+ use iceberg::{Catalog, CatalogBuilder, NamespaceIdent, TableCreation,
TableIdent};
+ use tempfile::TempDir;
use super::*;
@@ -253,10 +350,59 @@ mod tests {
static_table.into_table()
}
+ async fn get_test_catalog_and_table() -> (Arc<dyn Catalog>,
NamespaceIdent, String, TempDir) {
+ let temp_dir = TempDir::new().unwrap();
+ let warehouse_path = temp_dir.path().to_str().unwrap().to_string();
+
+ let catalog = MemoryCatalogBuilder::default()
+ .load(
+ "memory",
+ HashMap::from([(MEMORY_CATALOG_WAREHOUSE.to_string(),
warehouse_path.clone())]),
+ )
+ .await
+ .unwrap();
+
+ let namespace = NamespaceIdent::new("test_ns".to_string());
+ catalog
+ .create_namespace(&namespace, HashMap::new())
+ .await
+ .unwrap();
+
+ let schema = Schema::builder()
+ .with_schema_id(0)
+ .with_fields(vec![
+ NestedField::required(1, "id",
Type::Primitive(PrimitiveType::Int)).into(),
+ NestedField::required(2, "name",
Type::Primitive(PrimitiveType::String)).into(),
+ ])
+ .build()
+ .unwrap();
+
+ let table_creation = TableCreation::builder()
+ .name("test_table".to_string())
+ .location(format!("{}/test_table", warehouse_path))
+ .schema(schema)
+ .properties(HashMap::new())
+ .build();
+
+ catalog
+ .create_table(&namespace, table_creation)
+ .await
+ .unwrap();
+
+ (
+ Arc::new(catalog),
+ namespace,
+ "test_table".to_string(),
+ temp_dir,
+ )
+ }
+
+ // Tests for IcebergStaticTableProvider
+
#[tokio::test]
- async fn test_try_new_from_table() {
+ async fn test_static_provider_from_table() {
let table = get_test_table_from_metadata_file().await;
- let table_provider =
IcebergTableProvider::try_new_from_table(table.clone())
+ let table_provider =
IcebergStaticTableProvider::try_new_from_table(table.clone())
.await
.unwrap();
let ctx = SessionContext::new();
@@ -278,11 +424,11 @@ mod tests {
}
#[tokio::test]
- async fn test_try_new_from_table_snapshot() {
+ async fn test_static_provider_from_snapshot() {
let table = get_test_table_from_metadata_file().await;
let snapshot_id =
table.metadata().snapshots().next().unwrap().snapshot_id();
let table_provider =
- IcebergTableProvider::try_new_from_table_snapshot(table.clone(),
snapshot_id)
+
IcebergStaticTableProvider::try_new_from_table_snapshot(table.clone(),
snapshot_id)
.await
.unwrap();
let ctx = SessionContext::new();
@@ -304,16 +450,152 @@ mod tests {
}
#[tokio::test]
- async fn test_physical_input_schema_consistent_with_logical_input_schema()
{
+ async fn test_static_provider_rejects_writes() {
+ let table = get_test_table_from_metadata_file().await;
+ let table_provider =
IcebergStaticTableProvider::try_new_from_table(table.clone())
+ .await
+ .unwrap();
+ let ctx = SessionContext::new();
+ ctx.register_table("mytable", Arc::new(table_provider))
+ .unwrap();
+
+ // Attempt to insert into the static provider should fail
+ let result = ctx.sql("INSERT INTO mytable VALUES (1, 2, 3)").await;
+
+ // The error should occur during planning or execution
+ // We expect an error indicating write operations are not supported
+ assert!(
+ result.is_err() || {
+ let df = result.unwrap();
+ df.collect().await.is_err()
+ }
+ );
+ }
+
+ #[tokio::test]
+ async fn test_static_provider_scan() {
let table = get_test_table_from_metadata_file().await;
- let table_provider =
IcebergTableProvider::try_new_from_table(table.clone())
+ let table_provider =
IcebergStaticTableProvider::try_new_from_table(table.clone())
.await
.unwrap();
let ctx = SessionContext::new();
ctx.register_table("mytable", Arc::new(table_provider))
.unwrap();
+
+ // Test that scan operations work correctly
let df = ctx.sql("SELECT count(*) FROM mytable").await.unwrap();
let physical_plan = df.create_physical_plan().await;
- assert!(physical_plan.is_ok())
+ assert!(physical_plan.is_ok());
+ }
+
+ // Tests for IcebergTableProvider
+
+ #[tokio::test]
+ async fn test_catalog_backed_provider_creation() {
+ let (catalog, namespace, table_name, _temp_dir) =
get_test_catalog_and_table().await;
+
+ // Test creating a catalog-backed provider
+ let provider =
+ IcebergTableProvider::try_new(catalog.clone(), namespace.clone(),
table_name.clone())
+ .await
+ .unwrap();
+
+ // Verify the schema is loaded correctly
+ let schema = provider.schema();
+ assert_eq!(schema.fields().len(), 2);
+ assert_eq!(schema.field(0).name(), "id");
+ assert_eq!(schema.field(1).name(), "name");
+ }
+
+ #[tokio::test]
+ async fn test_catalog_backed_provider_scan() {
+ let (catalog, namespace, table_name, _temp_dir) =
get_test_catalog_and_table().await;
+
+ let provider =
+ IcebergTableProvider::try_new(catalog.clone(), namespace.clone(),
table_name.clone())
+ .await
+ .unwrap();
+
+ let ctx = SessionContext::new();
+ ctx.register_table("test_table", Arc::new(provider))
+ .unwrap();
+
+ // Test that scan operations work correctly
+ let df = ctx.sql("SELECT * FROM test_table").await.unwrap();
+
+ // Verify the schema in the query result
+ let df_schema = df.schema();
+ assert_eq!(df_schema.fields().len(), 2);
+ assert_eq!(df_schema.field(0).name(), "id");
+ assert_eq!(df_schema.field(1).name(), "name");
+
+ let physical_plan = df.create_physical_plan().await;
+ assert!(physical_plan.is_ok());
+ }
+
+ #[tokio::test]
+ async fn test_catalog_backed_provider_insert() {
+ let (catalog, namespace, table_name, _temp_dir) =
get_test_catalog_and_table().await;
+
+ let provider =
+ IcebergTableProvider::try_new(catalog.clone(), namespace.clone(),
table_name.clone())
+ .await
+ .unwrap();
+
+ let ctx = SessionContext::new();
+ ctx.register_table("test_table", Arc::new(provider))
+ .unwrap();
+
+ // Test that insert operations work correctly
+ let result = ctx.sql("INSERT INTO test_table VALUES (1,
'test')").await;
+
+ // Insert should succeed (or at least not fail during planning)
+ assert!(result.is_ok());
+
+ // Try to execute the insert plan
+ let df = result.unwrap();
+ let execution_result = df.collect().await;
+
+ // The execution should succeed
+ assert!(execution_result.is_ok());
+ }
+
+ #[tokio::test]
+ async fn test_physical_input_schema_consistent_with_logical_input_schema()
{
+ let (catalog, namespace, table_name, _temp_dir) =
get_test_catalog_and_table().await;
+
+ let provider =
+ IcebergTableProvider::try_new(catalog.clone(), namespace.clone(),
table_name.clone())
+ .await
+ .unwrap();
+
+ let ctx = SessionContext::new();
+ ctx.register_table("test_table", Arc::new(provider))
+ .unwrap();
+
+ // Create a query plan
+ let df = ctx.sql("SELECT id, name FROM test_table").await.unwrap();
+
+ // Get logical schema before consuming df
+ let logical_schema = df.schema().clone();
+
+ // Get physical plan (this consumes df)
+ let physical_plan = df.create_physical_plan().await.unwrap();
+ let physical_schema = physical_plan.schema();
+
+ // Verify that logical and physical schemas are consistent
+ assert_eq!(
+ logical_schema.fields().len(),
+ physical_schema.fields().len()
+ );
+
+ for (logical_field, physical_field) in logical_schema
+ .fields()
+ .iter()
+ .zip(physical_schema.fields().iter())
+ {
+ assert_eq!(logical_field.name(), physical_field.name());
+ assert_eq!(logical_field.data_type(), physical_field.data_type());
+ }
}
}
diff --git a/crates/integrations/datafusion/src/table/table_provider_factory.rs
b/crates/integrations/datafusion/src/table/table_provider_factory.rs
index e8e87dd31..8c0c8e90d 100644
--- a/crates/integrations/datafusion/src/table/table_provider_factory.rs
+++ b/crates/integrations/datafusion/src/table/table_provider_factory.rs
@@ -24,12 +24,11 @@ use datafusion::catalog::{Session, TableProvider,
TableProviderFactory};
use datafusion::error::Result as DFResult;
use datafusion::logical_expr::CreateExternalTable;
use datafusion::sql::TableReference;
-use iceberg::arrow::schema_to_arrow_schema;
use iceberg::io::FileIO;
use iceberg::table::StaticTable;
use iceberg::{Error, ErrorKind, Result, TableIdent};
-use super::IcebergTableProvider;
+use super::IcebergStaticTableProvider;
use crate::to_datafusion_error;
/// A factory that implements DataFusion's `TableProviderFactory` to create
`IcebergTableProvider` instances.
@@ -126,10 +125,11 @@ impl TableProviderFactory for IcebergTableProviderFactory
{
.map_err(to_datafusion_error)?
.into_table();
- let schema = schema_to_arrow_schema(table.metadata().current_schema())
+ let provider = IcebergStaticTableProvider::try_new_from_table(table)
+ .await
.map_err(to_datafusion_error)?;
- Ok(Arc::new(IcebergTableProvider::new(table, Arc::new(schema))))
+ Ok(Arc::new(provider))
}
}
diff --git
a/crates/integrations/datafusion/tests/integration_datafusion_test.rs
b/crates/integrations/datafusion/tests/integration_datafusion_test.rs
index fdf5b17d1..3ad84f383 100644
--- a/crates/integrations/datafusion/tests/integration_datafusion_test.rs
+++ b/crates/integrations/datafusion/tests/integration_datafusion_test.rs
@@ -492,10 +492,6 @@ async fn test_insert_into() -> Result<()> {
.unwrap();
assert_eq!(rows_inserted.value(0), 2);
- // Refresh context to avoid getting stale table
- let catalog = Arc::new(IcebergCatalogProvider::try_new(client).await?);
- ctx.register_catalog("catalog", catalog);
-
// Query the table to verify the inserted data
let df = ctx
.sql("SELECT * FROM catalog.test_insert_into.my_table")
@@ -650,10 +646,6 @@ async fn test_insert_into_nested() -> Result<()> {
.unwrap();
assert_eq!(rows_inserted.value(0), 2);
- // Refresh context to avoid getting stale table
- let catalog = Arc::new(IcebergCatalogProvider::try_new(client).await?);
- ctx.register_catalog("catalog", catalog);
-
// Query the table to verify the inserted data
let df = ctx
.sql("SELECT * FROM catalog.test_insert_nested.nested_table ORDER BY
id")
@@ -880,10 +872,6 @@ async fn test_insert_into_partitioned() -> Result<()> {
.unwrap();
assert_eq!(rows_inserted.value(0), 5);
- // Refresh catalog to get updated table
- let catalog =
Arc::new(IcebergCatalogProvider::try_new(client.clone()).await?);
- ctx.register_catalog("catalog", catalog);
-
// Query the table to verify data
let df = ctx
.sql("SELECT * FROM catalog.test_partitioned_write.partitioned_table
ORDER BY id")