This is an automated email from the ASF dual-hosted git repository.

liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 5cf85dab9 feat(datafusion): Support `CREATE TABLE` for DataFusion 
(#1972)
5cf85dab9 is described below

commit 5cf85dab903e171a077481597be1e49a1159715a
Author: Shawn Chang <[email protected]>
AuthorDate: Tue Jan 13 16:00:03 2026 -0800

    feat(datafusion): Support `CREATE TABLE` for DataFusion (#1972)
    
    ## Which issue does this PR close?
    
    - Closes #1905
    
    ## What changes are included in this PR?
    - Implement `register_table` in `IcebergSchemaProvider`
    - Added a new slt: `create_table.slt`
    - Updated existing slts
    
    Note: This does NOT cover the syntax `CREATE TABLE ... AS VALUES`
    
    ## Are these changes tested?
    Yes
---
 Cargo.lock                                         |   1 +
 Cargo.toml                                         |   1 +
 crates/integrations/datafusion/Cargo.toml          |   1 +
 crates/integrations/datafusion/src/schema.rs       | 272 +++++++++++++++++++--
 crates/sqllogictest/src/engine/datafusion.rs       |  31 +--
 .../sqllogictest/testdata/schedules/df_test.toml   |   4 +
 .../testdata/slts/df_test/create_table.slt         |  90 +++++++
 .../testdata/slts/df_test/insert_into.slt          |   4 +
 .../testdata/slts/df_test/show_tables.slt          |   3 -
 9 files changed, 358 insertions(+), 49 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index 51af571d5..3de43e685 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -3525,6 +3525,7 @@ version = "0.8.0"
 dependencies = [
  "anyhow",
  "async-trait",
+ "dashmap",
  "datafusion",
  "expect-test",
  "futures",
diff --git a/Cargo.toml b/Cargo.toml
index 56cd1801c..517bfa36e 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -62,6 +62,7 @@ bytes = "1.10"
 chrono = "0.4.41"
 clap = { version = "4.5.48", features = ["derive", "cargo"] }
 ctor = "0.2.8"
+dashmap = "6"
 datafusion = "51.0"
 datafusion-cli = "51.0"
 datafusion-sqllogictest = "51.0"
diff --git a/crates/integrations/datafusion/Cargo.toml 
b/crates/integrations/datafusion/Cargo.toml
index 0ee1738b4..fd3e489e4 100644
--- a/crates/integrations/datafusion/Cargo.toml
+++ b/crates/integrations/datafusion/Cargo.toml
@@ -31,6 +31,7 @@ repository = { workspace = true }
 [dependencies]
 anyhow = { workspace = true }
 async-trait = { workspace = true }
+dashmap = { workspace = true }
 datafusion = { workspace = true }
 futures = { workspace = true }
 iceberg = { workspace = true }
diff --git a/crates/integrations/datafusion/src/schema.rs 
b/crates/integrations/datafusion/src/schema.rs
index 31bbdbd67..022964ba6 100644
--- a/crates/integrations/datafusion/src/schema.rs
+++ b/crates/integrations/datafusion/src/schema.rs
@@ -16,16 +16,20 @@
 // under the License.
 
 use std::any::Any;
-use std::collections::HashMap;
 use std::sync::Arc;
 
 use async_trait::async_trait;
+use dashmap::DashMap;
 use datafusion::catalog::SchemaProvider;
 use datafusion::datasource::TableProvider;
 use datafusion::error::{DataFusionError, Result as DFResult};
+use datafusion::execution::TaskContext;
+use datafusion::prelude::SessionContext;
+use futures::StreamExt;
 use futures::future::try_join_all;
+use iceberg::arrow::arrow_schema_to_schema_auto_assign_ids;
 use iceberg::inspect::MetadataTableType;
-use iceberg::{Catalog, NamespaceIdent, Result};
+use iceberg::{Catalog, Error, ErrorKind, NamespaceIdent, Result, 
TableCreation};
 
 use crate::table::IcebergTableProvider;
 use crate::to_datafusion_error;
@@ -34,10 +38,15 @@ use crate::to_datafusion_error;
 /// access to table providers within a specific namespace.
 #[derive(Debug)]
 pub(crate) struct IcebergSchemaProvider {
-    /// A `HashMap` where keys are table names
+    /// Reference to the Iceberg catalog
+    catalog: Arc<dyn Catalog>,
+    /// The namespace this schema represents
+    namespace: NamespaceIdent,
+    /// A concurrent map where keys are table names
     /// and values are dynamic references to objects implementing the
     /// [`TableProvider`] trait.
-    tables: HashMap<String, Arc<IcebergTableProvider>>,
+    /// Wrapped in Arc to allow sharing across async boundaries in 
register_table.
+    tables: Arc<DashMap<String, Arc<IcebergTableProvider>>>,
 }
 
 impl IcebergSchemaProvider {
@@ -71,13 +80,16 @@ impl IcebergSchemaProvider {
         )
         .await?;
 
-        let tables: HashMap<String, Arc<IcebergTableProvider>> = table_names
-            .into_iter()
-            .zip(providers.into_iter())
-            .map(|(name, provider)| (name, Arc::new(provider)))
-            .collect();
+        let tables = Arc::new(DashMap::new());
+        for (name, provider) in 
table_names.into_iter().zip(providers.into_iter()) {
+            tables.insert(name, Arc::new(provider));
+        }
 
-        Ok(IcebergSchemaProvider { tables })
+        Ok(IcebergSchemaProvider {
+            catalog: client,
+            namespace,
+            tables,
+        })
     }
 }
 
@@ -89,13 +101,16 @@ impl SchemaProvider for IcebergSchemaProvider {
 
     fn table_names(&self) -> Vec<String> {
         self.tables
-            .keys()
-            .flat_map(|table_name| {
+            .iter()
+            .flat_map(|entry| {
+                let table_name = entry.key().clone();
                 [table_name.clone()]
                     .into_iter()
-                    
.chain(MetadataTableType::all_types().map(|metadata_table_name| {
-                        format!("{}${}", table_name.clone(), 
metadata_table_name.as_str())
-                    }))
+                    .chain(
+                        MetadataTableType::all_types().map(move 
|metadata_table_name| {
+                            format!("{}${}", table_name, 
metadata_table_name.as_str())
+                        }),
+                    )
             })
             .collect()
     }
@@ -127,7 +142,230 @@ impl SchemaProvider for IcebergSchemaProvider {
         Ok(self
             .tables
             .get(name)
-            .cloned()
-            .map(|t| t as Arc<dyn TableProvider>))
+            .map(|entry| entry.value().clone() as Arc<dyn TableProvider>))
+    }
+
+    fn register_table(
+        &self,
+        name: String,
+        table: Arc<dyn TableProvider>,
+    ) -> DFResult<Option<Arc<dyn TableProvider>>> {
+        // Check if table already exists
+        if self.table_exist(name.as_str()) {
+            return Err(DataFusionError::Execution(format!(
+                "Table {name} already exists"
+            )));
+        }
+
+        // Convert DataFusion schema to Iceberg schema
+        // DataFusion schemas don't have field IDs, so we use the function 
that assigns them automatically
+        let df_schema = table.schema();
+        let iceberg_schema = 
arrow_schema_to_schema_auto_assign_ids(df_schema.as_ref())
+            .map_err(to_datafusion_error)?;
+
+        // Create the table in the Iceberg catalog
+        let table_creation = TableCreation::builder()
+            .name(name.clone())
+            .schema(iceberg_schema)
+            .build();
+
+        let catalog = self.catalog.clone();
+        let namespace = self.namespace.clone();
+        let tables = self.tables.clone();
+        let name_clone = name.clone();
+
+        // Use tokio's spawn_blocking to handle the async work on a blocking 
thread pool
+        let result = tokio::task::spawn_blocking(move || {
+            // Create a new runtime handle to execute the async work
+            let rt = tokio::runtime::Handle::current();
+            rt.block_on(async move {
+                // Verify the input table is empty - CREATE TABLE only accepts 
schema definition
+                ensure_table_is_empty(&table)
+                    .await
+                    .map_err(to_datafusion_error)?;
+
+                catalog
+                    .create_table(&namespace, table_creation)
+                    .await
+                    .map_err(to_datafusion_error)?;
+
+                // Create a new table provider using the catalog reference
+                let table_provider = IcebergTableProvider::try_new(
+                    catalog.clone(),
+                    namespace.clone(),
+                    name_clone.clone(),
+                )
+                .await
+                .map_err(to_datafusion_error)?;
+
+                // Store the new table provider
+                tables.insert(name_clone, Arc::new(table_provider));
+
+                Ok(None)
+            })
+        });
+
+        // Block on the spawned task to get the result
+        // This is safe because spawn_blocking moves the blocking to a 
dedicated thread pool
+        futures::executor::block_on(result).map_err(|e| {
+            DataFusionError::Execution(format!("Failed to create Iceberg 
table: {e}"))
+        })?
+    }
+}
+
+/// Verifies that a table provider contains no data by scanning with LIMIT 1.
+/// Returns an error if the table has any rows.
+async fn ensure_table_is_empty(table: &Arc<dyn TableProvider>) -> Result<()> {
+    let session_ctx = SessionContext::new();
+    let exec_plan = table
+        .scan(&session_ctx.state(), None, &[], Some(1))
+        .await
+        .map_err(|e| Error::new(ErrorKind::Unexpected, format!("Failed to scan 
table: {e}")))?;
+
+    let task_ctx = Arc::new(TaskContext::default());
+    let stream = exec_plan.execute(0, task_ctx).map_err(|e| {
+        Error::new(
+            ErrorKind::Unexpected,
+            format!("Failed to execute scan: {e}"),
+        )
+    })?;
+
+    let batches: Vec<_> = stream.collect().await;
+    let has_data = batches
+        .into_iter()
+        .filter_map(|r| r.ok())
+        .any(|batch| batch.num_rows() > 0);
+
+    if has_data {
+        return Err(Error::new(
+            ErrorKind::Unexpected,
+            "register_table does not support tables with data.",
+        ));
+    }
+
+    Ok(())
+}
+
+#[cfg(test)]
+mod tests {
+    use std::collections::HashMap;
+    use std::sync::Arc;
+
+    use datafusion::arrow::array::{Int32Array, StringArray};
+    use datafusion::arrow::datatypes::{DataType, Field, Schema as ArrowSchema};
+    use datafusion::arrow::record_batch::RecordBatch;
+    use datafusion::datasource::MemTable;
+    use iceberg::memory::{MEMORY_CATALOG_WAREHOUSE, MemoryCatalogBuilder};
+    use iceberg::{Catalog, CatalogBuilder, NamespaceIdent};
+    use tempfile::TempDir;
+
+    use super::*;
+
+    async fn create_test_schema_provider() -> (IcebergSchemaProvider, TempDir) 
{
+        let temp_dir = TempDir::new().unwrap();
+        let warehouse_path = temp_dir.path().to_str().unwrap().to_string();
+
+        let catalog = MemoryCatalogBuilder::default()
+            .load(
+                "memory",
+                HashMap::from([(MEMORY_CATALOG_WAREHOUSE.to_string(), 
warehouse_path.clone())]),
+            )
+            .await
+            .unwrap();
+
+        let namespace = NamespaceIdent::new("test_ns".to_string());
+        catalog
+            .create_namespace(&namespace, HashMap::new())
+            .await
+            .unwrap();
+
+        let provider = IcebergSchemaProvider::try_new(Arc::new(catalog), 
namespace)
+            .await
+            .unwrap();
+
+        (provider, temp_dir)
+    }
+
+    #[tokio::test]
+    async fn test_register_table_with_data_fails() {
+        let (schema_provider, _temp_dir) = create_test_schema_provider().await;
+
+        // Create a MemTable with data
+        let arrow_schema = Arc::new(ArrowSchema::new(vec![
+            Field::new("id", DataType::Int32, false),
+            Field::new("name", DataType::Utf8, true),
+        ]));
+
+        let batch = RecordBatch::try_new(arrow_schema.clone(), vec![
+            Arc::new(Int32Array::from(vec![1, 2, 3])),
+            Arc::new(StringArray::from(vec!["Alice", "Bob", "Charlie"])),
+        ])
+        .unwrap();
+
+        let mem_table = MemTable::try_new(arrow_schema, 
vec![vec![batch]]).unwrap();
+
+        // Attempt to register the table with data - should fail
+        let result = schema_provider.register_table("test_table".to_string(), 
Arc::new(mem_table));
+
+        assert!(result.is_err());
+        let err = result.unwrap_err();
+        assert!(
+            err.to_string()
+                .contains("register_table does not support tables with data."),
+            "Expected error about tables with data, got: {err}",
+        );
+    }
+
+    #[tokio::test]
+    async fn test_register_empty_table_succeeds() {
+        let (schema_provider, _temp_dir) = create_test_schema_provider().await;
+
+        // Create an empty MemTable (schema only, no data rows)
+        let arrow_schema = Arc::new(ArrowSchema::new(vec![
+            Field::new("id", DataType::Int32, false),
+            Field::new("name", DataType::Utf8, true),
+        ]));
+
+        // Create an empty batch (0 rows) - MemTable requires at least one 
partition
+        let empty_batch = RecordBatch::new_empty(arrow_schema.clone());
+        let mem_table = MemTable::try_new(arrow_schema, 
vec![vec![empty_batch]]).unwrap();
+
+        // Attempt to register the empty table - should succeed
+        let result = schema_provider.register_table("empty_table".to_string(), 
Arc::new(mem_table));
+
+        assert!(result.is_ok(), "Expected success, got: {result:?}");
+
+        // Verify the table was registered
+        assert!(schema_provider.table_exist("empty_table"));
+    }
+
+    #[tokio::test]
+    async fn test_register_duplicate_table_fails() {
+        let (schema_provider, _temp_dir) = create_test_schema_provider().await;
+
+        // Create empty MemTables
+        let arrow_schema = Arc::new(ArrowSchema::new(vec![Field::new(
+            "id",
+            DataType::Int32,
+            false,
+        )]));
+
+        let empty_batch1 = RecordBatch::new_empty(arrow_schema.clone());
+        let empty_batch2 = RecordBatch::new_empty(arrow_schema.clone());
+        let mem_table1 = MemTable::try_new(arrow_schema.clone(), 
vec![vec![empty_batch1]]).unwrap();
+        let mem_table2 = MemTable::try_new(arrow_schema, 
vec![vec![empty_batch2]]).unwrap();
+
+        // Register first table - should succeed
+        let result1 = schema_provider.register_table("dup_table".to_string(), 
Arc::new(mem_table1));
+        assert!(result1.is_ok());
+
+        // Register second table with same name - should fail
+        let result2 = schema_provider.register_table("dup_table".to_string(), 
Arc::new(mem_table2));
+        assert!(result2.is_err());
+        let err = result2.unwrap_err();
+        assert!(
+            err.to_string().contains("already exists"),
+            "Expected error about table already existing, got: {err}",
+        );
     }
 }
diff --git a/crates/sqllogictest/src/engine/datafusion.rs 
b/crates/sqllogictest/src/engine/datafusion.rs
index e9f93287d..487d8dc97 100644
--- a/crates/sqllogictest/src/engine/datafusion.rs
+++ b/crates/sqllogictest/src/engine/datafusion.rs
@@ -93,8 +93,7 @@ impl DataFusionEngine {
         let namespace = NamespaceIdent::new("default".to_string());
         catalog.create_namespace(&namespace, HashMap::new()).await?;
 
-        // Create test tables
-        Self::create_unpartitioned_table(&catalog, &namespace).await?;
+        // Create partitioned test table (unpartitioned tables are now created 
via SQL)
         Self::create_partitioned_table(&catalog, &namespace).await?;
 
         Ok(Arc::new(
@@ -102,35 +101,9 @@ impl DataFusionEngine {
         ))
     }
 
-    /// Create an unpartitioned test table with id and name columns
-    /// TODO: this can be removed when we support CREATE TABLE
-    async fn create_unpartitioned_table(
-        catalog: &impl Catalog,
-        namespace: &NamespaceIdent,
-    ) -> anyhow::Result<()> {
-        let schema = Schema::builder()
-            .with_fields(vec![
-                NestedField::required(1, "id", 
Type::Primitive(PrimitiveType::Int)).into(),
-                NestedField::optional(2, "name", 
Type::Primitive(PrimitiveType::String)).into(),
-            ])
-            .build()?;
-
-        catalog
-            .create_table(
-                namespace,
-                TableCreation::builder()
-                    .name("test_unpartitioned_table".to_string())
-                    .schema(schema)
-                    .build(),
-            )
-            .await?;
-
-        Ok(())
-    }
-
     /// Create a partitioned test table with id, category, and value columns
     /// Partitioned by category using identity transform
-    /// TODO: this can be removed when we support CREATE TABLE
+    /// TODO: this can be removed when we support CREATE EXTERNAL TABLE
     async fn create_partitioned_table(
         catalog: &impl Catalog,
         namespace: &NamespaceIdent,
diff --git a/crates/sqllogictest/testdata/schedules/df_test.toml 
b/crates/sqllogictest/testdata/schedules/df_test.toml
index df5e638d5..1d7f42c8d 100644
--- a/crates/sqllogictest/testdata/schedules/df_test.toml
+++ b/crates/sqllogictest/testdata/schedules/df_test.toml
@@ -22,6 +22,10 @@ df = { type = "datafusion" }
 engine = "df"
 slt = "df_test/show_tables.slt"
 
+[[steps]]
+engine = "df"
+slt = "df_test/create_table.slt"
+
 [[steps]]
 engine = "df"
 slt = "df_test/insert_into.slt"
diff --git a/crates/sqllogictest/testdata/slts/df_test/create_table.slt 
b/crates/sqllogictest/testdata/slts/df_test/create_table.slt
new file mode 100644
index 000000000..2eab1b6ba
--- /dev/null
+++ b/crates/sqllogictest/testdata/slts/df_test/create_table.slt
@@ -0,0 +1,90 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Test CREATE TABLE with explicit schema
+statement ok
+CREATE TABLE default.default.empty_table (id INT NOT NULL, name STRING)
+
+# Verify the empty table exists and has correct schema
+query IT rowsort
+SELECT * FROM default.default.empty_table
+----
+
+# Insert data into the created table
+query I
+INSERT INTO default.default.empty_table VALUES (1, 'Alice')
+----
+1
+
+# Verify the inserted data
+query IT rowsort
+SELECT * FROM default.default.empty_table
+----
+1 Alice
+
+# Insert multiple rows
+query I
+INSERT INTO default.default.empty_table VALUES (2, 'Bob'), (3, 'Charlie')
+----
+2
+
+# Verify all rows
+query IT rowsort
+SELECT * FROM default.default.empty_table
+----
+1 Alice
+2 Bob
+3 Charlie
+
+# Test CREATE TABLE with different column types
+statement ok
+CREATE TABLE default.default.typed_table (id BIGINT NOT NULL, value DOUBLE, 
flag BOOLEAN)
+
+# Verify the typed table exists
+query IDT rowsort
+SELECT * FROM default.default.typed_table
+----
+
+# Insert data with different types
+query I
+INSERT INTO default.default.typed_table VALUES (100, 3.14, true), (200, 2.71, 
false)
+----
+2
+
+# Verify typed data
+query IDT rowsort
+SELECT * FROM default.default.typed_table
+----
+100 3.14 true
+200 2.71 false
+
+# Test CREATE TABLE with nullable columns
+statement ok
+CREATE TABLE default.default.nullable_table (id INT NOT NULL, optional_name 
STRING)
+
+# Insert with NULL value
+query I
+INSERT INTO default.default.nullable_table VALUES (1, 'Value'), (2, NULL)
+----
+2
+
+# Verify NULL handling
+query IT rowsort
+SELECT * FROM default.default.nullable_table
+----
+1 Value
+2 NULL
diff --git a/crates/sqllogictest/testdata/slts/df_test/insert_into.slt 
b/crates/sqllogictest/testdata/slts/df_test/insert_into.slt
index 2ba33afcd..1e0784432 100644
--- a/crates/sqllogictest/testdata/slts/df_test/insert_into.slt
+++ b/crates/sqllogictest/testdata/slts/df_test/insert_into.slt
@@ -15,6 +15,10 @@
 # specific language governing permissions and limitations
 # under the License.
 
+# Create unpartitioned test table
+statement ok
+CREATE TABLE default.default.test_unpartitioned_table (id INT NOT NULL, name 
STRING)
+
 # Verify the table is initially empty
 query IT rowsort
 SELECT * FROM default.default.test_unpartitioned_table
diff --git a/crates/sqllogictest/testdata/slts/df_test/show_tables.slt 
b/crates/sqllogictest/testdata/slts/df_test/show_tables.slt
index c5da5f627..770072f9d 100644
--- a/crates/sqllogictest/testdata/slts/df_test/show_tables.slt
+++ b/crates/sqllogictest/testdata/slts/df_test/show_tables.slt
@@ -28,9 +28,6 @@ datafusion information_schema views VIEW
 default default test_partitioned_table BASE TABLE
 default default test_partitioned_table$manifests BASE TABLE
 default default test_partitioned_table$snapshots BASE TABLE
-default default test_unpartitioned_table BASE TABLE
-default default test_unpartitioned_table$manifests BASE TABLE
-default default test_unpartitioned_table$snapshots BASE TABLE
 default information_schema columns VIEW
 default information_schema df_settings VIEW
 default information_schema parameters VIEW

Reply via email to