CTTY commented on code in PR #1972:
URL: https://github.com/apache/iceberg-rust/pull/1972#discussion_r2684090755


##########
crates/integrations/datafusion/src/schema.rs:
##########
@@ -127,7 +139,61 @@ impl SchemaProvider for IcebergSchemaProvider {
         Ok(self
             .tables
             .get(name)
-            .cloned()
-            .map(|t| t as Arc<dyn TableProvider>))
+            .map(|entry| entry.value().clone() as Arc<dyn TableProvider>))
+    }
+
+    fn register_table(
+        &self,
+        name: String,
+        table: Arc<dyn TableProvider>,
+    ) -> DFResult<Option<Arc<dyn TableProvider>>> {
+        // Convert DataFusion schema to Iceberg schema
+        // DataFusion schemas don't have field IDs, so we use the function 
that assigns them automatically
+        let df_schema = table.schema();
+        let iceberg_schema = 
arrow_schema_to_schema_auto_assign_ids(df_schema.as_ref())
+            .map_err(to_datafusion_error)?;
+
+        // Create the table in the Iceberg catalog
+        let table_creation = TableCreation::builder()
+            .name(name.clone())
+            .schema(iceberg_schema)
+            .build();
+
+        let catalog = self.catalog.clone();
+        let namespace = self.namespace.clone();
+        let tables = self.tables.clone();
+        let name_clone = name.clone();
+
+        // Use tokio's spawn_blocking to handle the async work on a blocking 
thread pool
+        let result = tokio::task::spawn_blocking(move || {
+            // Create a new runtime handle to execute the async work
+            let rt = tokio::runtime::Handle::current();
+            rt.block_on(async move {
+                catalog
+                    .create_table(&namespace, table_creation)
+                    .await
+                    .map_err(to_datafusion_error)?;
+
+                // Create a new table provider using the catalog reference
+                let table_provider = IcebergTableProvider::try_new(
+                    catalog.clone(),
+                    namespace.clone(),
+                    name_clone.clone(),
+                )
+                .await
+                .map_err(to_datafusion_error)?;
+
+                // Store the new table provider
+                let old_table = tables.insert(name_clone, 
Arc::new(table_provider));

Review Comment:
   Good catch! Will fix it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to