This is an automated email from the ASF dual-hosted git repository.

blackmwk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new b9b6c7e01 feat!(runtime): Support custom Runtime in Catalog (#2308)
b9b6c7e01 is described below

commit b9b6c7e01a884d73f357aeba12c54f82dd1a24f7
Author: Shawn Chang <[email protected]>
AuthorDate: Sat May 16 02:25:19 2026 -0700

    feat!(runtime): Support custom Runtime in Catalog (#2308)
    
    ## Which issue does this PR close?
    
    <!--
    We generally require a GitHub issue to be filed for all bug fixes and
    enhancements and this helps us generate change logs for our releases.
    You can link an issue to this PR using the GitHub syntax. For example
    `Closes #123` indicates that this PR will close issue #123.
    -->
    
    - Closes #1958
    
    ## What changes are included in this PR?
    - Introduced RuntimeHandle
    - Refactored Runtime to compose two RuntimeHandles: one for IO-bound
    work (io()) and one for CPU-bound work (cpu()).
    - Added `with_runtime` to `CatalogBuilder` trait
    
    
    <!--
    Provide a summary of the modifications in this PR. List the main changes
    such as new features, bug fixes, refactoring, or any other updates.
    -->
    
    ## Are these changes tested?
    Yes
    <!--
    Specify what test covers (unit test, integration test, etc.).
    
    If tests are not included in your PR, please explain why (for example,
    are they covered by existing tests)?
    -->
    
    ---------
    
    Co-authored-by: blackmwk <[email protected]>
---
 Cargo.toml                                         |   2 +-
 crates/catalog/glue/src/catalog.rs                 |  21 +-
 crates/catalog/hms/src/catalog.rs                  |  43 ++-
 crates/catalog/rest/Cargo.toml                     |   2 +-
 crates/catalog/rest/src/catalog.rs                 |  63 ++++-
 crates/catalog/s3tables/src/catalog.rs             |  24 +-
 crates/catalog/sql/src/catalog.rs                  |  21 +-
 crates/iceberg/Cargo.toml                          |   2 +-
 .../src/arrow/caching_delete_file_loader.rs        |  22 +-
 crates/iceberg/src/arrow/delete_filter.rs          |  19 +-
 crates/iceberg/src/arrow/reader/mod.rs             |   6 +-
 crates/iceberg/src/arrow/reader/pipeline.rs        |   5 +-
 .../iceberg/src/arrow/reader/positional_deletes.rs |   7 +-
 crates/iceberg/src/arrow/reader/projection.rs      |  22 +-
 crates/iceberg/src/arrow/reader/row_filter.rs      |   9 +-
 crates/iceberg/src/catalog/memory/catalog.rs       |  24 +-
 crates/iceberg/src/catalog/mod.rs                  |  10 +
 crates/iceberg/src/delete_file_index.rs            |   6 +-
 crates/iceberg/src/io/object_cache.rs              |   2 +
 crates/iceberg/src/lib.rs                          |   1 +
 crates/iceberg/src/runtime/mod.rs                  | 293 +++++++++++++++++++--
 crates/iceberg/src/scan/mod.rs                     | 157 +++++++----
 crates/iceberg/src/spec/table_metadata_builder.rs  |   3 +
 crates/iceberg/src/table.rs                        |  31 ++-
 crates/iceberg/src/test_utils.rs                   |  22 ++
 crates/iceberg/src/transaction/mod.rs              |   4 +
 crates/iceberg/src/transaction/update_schema.rs    |   1 +
 .../datafusion/src/physical_plan/project.rs        |   4 +
 .../datafusion/src/physical_plan/repartition.rs    |   9 +
 crates/sqllogictest/Cargo.toml                     |   2 +-
 crates/storage/opendal/Cargo.toml                  |   2 +-
 31 files changed, 686 insertions(+), 153 deletions(-)

diff --git a/Cargo.toml b/Cargo.toml
index 793bb49d8..d23e013f1 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -126,7 +126,7 @@ stacker = "0.1.20"
 strum = "0.27.2"
 tempfile = "3.18"
 thrift = "0.17.0"
-tokio = { version = "1.47", default-features = false }
+tokio = { version = "1.47", default-features = false, features = ["sync", 
"rt-multi-thread"] }
 toml = "0.8"
 tracing = "0.1.41"
 tracing-subscriber = "0.3.20"
diff --git a/crates/catalog/glue/src/catalog.rs 
b/crates/catalog/glue/src/catalog.rs
index 5b3ccf3b3..c51f6a6a8 100644
--- a/crates/catalog/glue/src/catalog.rs
+++ b/crates/catalog/glue/src/catalog.rs
@@ -33,7 +33,7 @@ use iceberg::spec::{TableMetadata, TableMetadataBuilder};
 use iceberg::table::Table;
 use iceberg::{
     Catalog, CatalogBuilder, Error, ErrorKind, MetadataLocation, Namespace, 
NamespaceIdent, Result,
-    TableCommit, TableCreation, TableIdent,
+    Runtime, TableCommit, TableCreation, TableIdent,
 };
 use iceberg_storage_opendal::OpenDalStorageFactory;
 
@@ -58,6 +58,7 @@ pub const GLUE_CATALOG_PROP_WAREHOUSE: &str = "warehouse";
 pub struct GlueCatalogBuilder {
     config: GlueCatalogConfig,
     storage_factory: Option<Arc<dyn StorageFactory>>,
+    runtime: Option<Runtime>,
 }
 
 impl Default for GlueCatalogBuilder {
@@ -71,6 +72,7 @@ impl Default for GlueCatalogBuilder {
                 props: HashMap::new(),
             },
             storage_factory: None,
+            runtime: None,
         }
     }
 }
@@ -83,6 +85,11 @@ impl CatalogBuilder for GlueCatalogBuilder {
         self
     }
 
+    fn with_runtime(mut self, runtime: Runtime) -> Self {
+        self.runtime = Some(runtime);
+        self
+    }
+
     fn load(
         mut self,
         name: impl Into<String>,
@@ -129,7 +136,11 @@ impl CatalogBuilder for GlueCatalogBuilder {
                 ));
             }
 
-            GlueCatalog::new(self.config, self.storage_factory).await
+            let runtime = match self.runtime {
+                Some(rt) => rt,
+                None => Runtime::try_current()?,
+            };
+            GlueCatalog::new(self.config, self.storage_factory, runtime).await
         }
     }
 }
@@ -151,6 +162,7 @@ pub struct GlueCatalog {
     config: GlueCatalogConfig,
     client: GlueClient,
     file_io: FileIO,
+    runtime: Runtime,
 }
 
 impl Debug for GlueCatalog {
@@ -166,6 +178,7 @@ impl GlueCatalog {
     async fn new(
         config: GlueCatalogConfig,
         storage_factory: Option<Arc<dyn StorageFactory>>,
+        runtime: Runtime,
     ) -> Result<Self> {
         let sdk_config = create_sdk_config(&config.props, 
config.uri.as_ref()).await;
         let mut file_io_props = config.props.clone();
@@ -214,6 +227,7 @@ impl GlueCatalog {
             config,
             client: GlueClient(client),
             file_io,
+            runtime,
         })
     }
     /// Get the catalogs `FileIO`
@@ -272,6 +286,7 @@ impl GlueCatalog {
                 NamespaceIdent::new(db_name),
                 table_name.to_owned(),
             ))
+            .runtime(self.runtime.clone())
             .build()?;
 
         Ok((table, version_id))
@@ -611,6 +626,7 @@ impl Catalog for GlueCatalog {
             .metadata_location(metadata_location_str)
             .metadata(metadata)
             .identifier(TableIdent::new(NamespaceIdent::new(db_name), 
table_name))
+            .runtime(self.runtime.clone())
             .build()
     }
 
@@ -845,6 +861,7 @@ impl Catalog for GlueCatalog {
             .metadata_location(metadata_location)
             .metadata(metadata)
             .file_io(self.file_io())
+            .runtime(self.runtime.clone())
             .build()?)
     }
 
diff --git a/crates/catalog/hms/src/catalog.rs 
b/crates/catalog/hms/src/catalog.rs
index 4a030c110..d778a3d5f 100644
--- a/crates/catalog/hms/src/catalog.rs
+++ b/crates/catalog/hms/src/catalog.rs
@@ -31,7 +31,7 @@ use iceberg::spec::{TableMetadata, TableMetadataBuilder};
 use iceberg::table::Table;
 use iceberg::{
     Catalog, CatalogBuilder, Error, ErrorKind, MetadataLocation, Namespace, 
NamespaceIdent, Result,
-    TableCommit, TableCreation, TableIdent,
+    Runtime, TableCommit, TableCreation, TableIdent,
 };
 use volo_thrift::MaybeException;
 
@@ -56,6 +56,7 @@ pub const HMS_CATALOG_PROP_WAREHOUSE: &str = "warehouse";
 pub struct HmsCatalogBuilder {
     config: HmsCatalogConfig,
     storage_factory: Option<Arc<dyn StorageFactory>>,
+    runtime: Option<Runtime>,
 }
 
 impl Default for HmsCatalogBuilder {
@@ -69,6 +70,7 @@ impl Default for HmsCatalogBuilder {
                 props: HashMap::new(),
             },
             storage_factory: None,
+            runtime: None,
         }
     }
 }
@@ -81,6 +83,11 @@ impl CatalogBuilder for HmsCatalogBuilder {
         self
     }
 
+    fn with_runtime(mut self, runtime: Runtime) -> Self {
+        self.runtime = Some(runtime);
+        self
+    }
+
     fn load(
         mut self,
         name: impl Into<String>,
@@ -116,26 +123,31 @@ impl CatalogBuilder for HmsCatalogBuilder {
             })
             .collect();
 
-        let result = {
+        let result = (|| -> Result<HmsCatalog> {
             if self.config.name.is_none() {
-                Err(Error::new(
+                return Err(Error::new(
                     ErrorKind::DataInvalid,
                     "Catalog name is required",
-                ))
-            } else if self.config.address.is_empty() {
-                Err(Error::new(
+                ));
+            }
+            if self.config.address.is_empty() {
+                return Err(Error::new(
                     ErrorKind::DataInvalid,
                     "Catalog address is required",
-                ))
-            } else if self.config.warehouse.is_empty() {
-                Err(Error::new(
+                ));
+            }
+            if self.config.warehouse.is_empty() {
+                return Err(Error::new(
                     ErrorKind::DataInvalid,
                     "Catalog warehouse is required",
-                ))
-            } else {
-                HmsCatalog::new(self.config, self.storage_factory)
+                ));
             }
-        };
+            let runtime = match self.runtime {
+                Some(rt) => rt,
+                None => Runtime::try_current()?,
+            };
+            HmsCatalog::new(self.config, self.storage_factory, runtime)
+        })();
 
         std::future::ready(result)
     }
@@ -169,6 +181,7 @@ pub struct HmsCatalog {
     config: HmsCatalogConfig,
     client: HmsClient,
     file_io: FileIO,
+    runtime: Runtime,
 }
 
 impl Debug for HmsCatalog {
@@ -184,6 +197,7 @@ impl HmsCatalog {
     fn new(
         config: HmsCatalogConfig,
         storage_factory: Option<Arc<dyn StorageFactory>>,
+        runtime: Runtime,
     ) -> Result<Self> {
         let address = config
             .address
@@ -223,6 +237,7 @@ impl HmsCatalog {
             config,
             client: HmsClient(client),
             file_io,
+            runtime,
         })
     }
     /// Get the catalogs `FileIO`
@@ -529,6 +544,7 @@ impl Catalog for HmsCatalog {
             .metadata_location(metadata_location_str)
             .metadata(metadata)
             .identifier(TableIdent::new(NamespaceIdent::new(db_name), 
table_name))
+            .runtime(self.runtime.clone())
             .build()
     }
 
@@ -567,6 +583,7 @@ impl Catalog for HmsCatalog {
                 NamespaceIdent::new(db_name),
                 table.name.clone(),
             ))
+            .runtime(self.runtime.clone())
             .build()
     }
 
diff --git a/crates/catalog/rest/Cargo.toml b/crates/catalog/rest/Cargo.toml
index 40dd70a95..f21f7609a 100644
--- a/crates/catalog/rest/Cargo.toml
+++ b/crates/catalog/rest/Cargo.toml
@@ -38,7 +38,7 @@ reqwest = { workspace = true }
 serde = { workspace = true }
 serde_derive = { workspace = true }
 serde_json = { workspace = true }
-tokio = { workspace = true, features = ["sync"] }
+tokio = { workspace = true }
 typed-builder = { workspace = true }
 uuid = { workspace = true, features = ["v4"] }
 
diff --git a/crates/catalog/rest/src/catalog.rs 
b/crates/catalog/rest/src/catalog.rs
index 7d5df24d5..0626ce506 100644
--- a/crates/catalog/rest/src/catalog.rs
+++ b/crates/catalog/rest/src/catalog.rs
@@ -26,8 +26,8 @@ use async_trait::async_trait;
 use iceberg::io::{FileIO, FileIOBuilder, StorageFactory};
 use iceberg::table::Table;
 use iceberg::{
-    Catalog, CatalogBuilder, Error, ErrorKind, Namespace, NamespaceIdent, 
Result, TableCommit,
-    TableCreation, TableIdent,
+    Catalog, CatalogBuilder, Error, ErrorKind, Namespace, NamespaceIdent, 
Result, Runtime,
+    TableCommit, TableCreation, TableIdent,
 };
 use itertools::Itertools;
 use reqwest::header::{
@@ -62,6 +62,7 @@ const PATH_V1: &str = "v1";
 pub struct RestCatalogBuilder {
     config: RestCatalogConfig,
     storage_factory: Option<Arc<dyn StorageFactory>>,
+    runtime: Option<Runtime>,
 }
 
 impl Default for RestCatalogBuilder {
@@ -75,6 +76,7 @@ impl Default for RestCatalogBuilder {
                 client: None,
             },
             storage_factory: None,
+            runtime: None,
         }
     }
 }
@@ -87,6 +89,11 @@ impl CatalogBuilder for RestCatalogBuilder {
         self
     }
 
+    fn with_runtime(mut self, runtime: Runtime) -> Self {
+        self.runtime = Some(runtime);
+        self
+    }
+
     fn load(
         mut self,
         name: impl Into<String>,
@@ -123,7 +130,8 @@ impl CatalogBuilder for RestCatalogBuilder {
                     "Catalog uri is required",
                 ))
             } else {
-                Ok(RestCatalog::new(self.config, self.storage_factory))
+                let runtime = self.runtime.unwrap_or_else(Runtime::current);
+                Ok(RestCatalog::new(self.config, self.storage_factory, 
runtime))
             }
         };
 
@@ -351,15 +359,21 @@ pub struct RestCatalog {
     ctx: OnceCell<RestContext>,
     /// Storage factory for creating FileIO instances.
     storage_factory: Option<Arc<dyn StorageFactory>>,
+    runtime: Runtime,
 }
 
 impl RestCatalog {
     /// Creates a `RestCatalog` from a [`RestCatalogConfig`].
-    fn new(config: RestCatalogConfig, storage_factory: Option<Arc<dyn 
StorageFactory>>) -> Self {
+    fn new(
+        config: RestCatalogConfig,
+        storage_factory: Option<Arc<dyn StorageFactory>>,
+        runtime: Runtime,
+    ) -> Self {
         Self {
             user_config: config,
             ctx: OnceCell::new(),
             storage_factory,
+            runtime,
         }
     }
 
@@ -790,7 +804,8 @@ impl Catalog for RestCatalog {
         let table_builder = Table::builder()
             .identifier(table_ident.clone())
             .file_io(file_io)
-            .metadata(response.metadata);
+            .metadata(response.metadata)
+            .runtime(self.runtime.clone());
 
         if let Some(metadata_location) = response.metadata_location {
             table_builder.metadata_location(metadata_location).build()
@@ -846,7 +861,8 @@ impl Catalog for RestCatalog {
         let table_builder = Table::builder()
             .identifier(table_ident.clone())
             .file_io(file_io)
-            .metadata(response.metadata);
+            .metadata(response.metadata)
+            .runtime(self.runtime.clone());
 
         if let Some(metadata_location) = response.metadata_location {
             table_builder.metadata_location(metadata_location).build()
@@ -982,6 +998,7 @@ impl Catalog for RestCatalog {
             .file_io(file_io)
             .metadata(response.metadata)
             .metadata_location(metadata_location.clone())
+            .runtime(self.runtime.clone())
             .build()
     }
 
@@ -1054,6 +1071,7 @@ impl Catalog for RestCatalog {
             .file_io(file_io)
             .metadata(response.metadata)
             .metadata_location(response.metadata_location)
+            .runtime(self.runtime.clone())
             .build()
     }
 }
@@ -1071,6 +1089,7 @@ mod tests {
         SnapshotLog, SortDirection, SortField, SortOrder, Summary, Transform, 
Type,
         UnboundPartitionField, UnboundPartitionSpec,
     };
+    use iceberg::test_utils::test_runtime;
     use iceberg::transaction::{ApplyTransactionAction, Transaction};
     use mockito::{Mock, Server, ServerGuard};
     use serde_json::json;
@@ -1099,6 +1118,7 @@ mod tests {
         let catalog = RestCatalog::new(
             RestCatalogConfig::builder().uri(server.url()).build(),
             Some(Arc::new(LocalFsStorageFactory)),
+            Runtime::current(),
         );
 
         assert_eq!(
@@ -1173,6 +1193,7 @@ mod tests {
                 .props(props)
                 .build(),
             Some(Arc::new(LocalFsStorageFactory)),
+            Runtime::current(),
         );
 
         let token = catalog.context().await.unwrap().client.token().await;
@@ -1220,6 +1241,7 @@ mod tests {
                 .props(props)
                 .build(),
             Some(Arc::new(LocalFsStorageFactory)),
+            Runtime::current(),
         );
 
         let token = catalog.context().await.unwrap().client.token().await;
@@ -1244,6 +1266,7 @@ mod tests {
                 .props(props)
                 .build(),
             Some(Arc::new(LocalFsStorageFactory)),
+            Runtime::current(),
         );
 
         let token = catalog.context().await.unwrap().client.token().await;
@@ -1275,6 +1298,7 @@ mod tests {
                 .props(props)
                 .build(),
             Some(Arc::new(LocalFsStorageFactory)),
+            Runtime::current(),
         );
 
         let token = catalog.context().await.unwrap().client.token().await;
@@ -1306,6 +1330,7 @@ mod tests {
                 .props(props)
                 .build(),
             Some(Arc::new(LocalFsStorageFactory)),
+            Runtime::current(),
         );
 
         let token = catalog.context().await.unwrap().client.token().await;
@@ -1337,6 +1362,7 @@ mod tests {
                 .props(props)
                 .build(),
             Some(Arc::new(LocalFsStorageFactory)),
+            Runtime::current(),
         );
 
         let token = catalog.context().await.unwrap().client.token().await;
@@ -1450,6 +1476,7 @@ mod tests {
                 .props(props)
                 .build(),
             Some(Arc::new(LocalFsStorageFactory)),
+            Runtime::current(),
         );
 
         let token = catalog.context().await.unwrap().client.token().await;
@@ -1497,6 +1524,7 @@ mod tests {
         let catalog = RestCatalog::new(
             RestCatalogConfig::builder().uri(server.url()).build(),
             Some(Arc::new(LocalFsStorageFactory)),
+            Runtime::current(),
         );
 
         let _namespaces = catalog.list_namespaces(None).await.unwrap();
@@ -1527,6 +1555,7 @@ mod tests {
         let catalog = RestCatalog::new(
             RestCatalogConfig::builder().uri(server.url()).build(),
             Some(Arc::new(LocalFsStorageFactory)),
+            Runtime::current(),
         );
 
         let namespaces = catalog.list_namespaces(None).await.unwrap();
@@ -1578,6 +1607,7 @@ mod tests {
         let catalog = RestCatalog::new(
             RestCatalogConfig::builder().uri(server.url()).build(),
             Some(Arc::new(LocalFsStorageFactory)),
+            Runtime::current(),
         );
 
         let namespaces = catalog.list_namespaces(None).await.unwrap();
@@ -1677,6 +1707,7 @@ mod tests {
         let catalog = RestCatalog::new(
             RestCatalogConfig::builder().uri(server.url()).build(),
             Some(Arc::new(LocalFsStorageFactory)),
+            Runtime::current(),
         );
 
         let namespaces = catalog.list_namespaces(None).await.unwrap();
@@ -1730,6 +1761,7 @@ mod tests {
         let catalog = RestCatalog::new(
             RestCatalogConfig::builder().uri(server.url()).build(),
             Some(Arc::new(LocalFsStorageFactory)),
+            Runtime::current(),
         );
 
         let namespaces = catalog
@@ -1773,6 +1805,7 @@ mod tests {
         let catalog = RestCatalog::new(
             RestCatalogConfig::builder().uri(server.url()).build(),
             Some(Arc::new(LocalFsStorageFactory)),
+            Runtime::current(),
         );
 
         let namespaces = catalog
@@ -1806,6 +1839,7 @@ mod tests {
         let catalog = RestCatalog::new(
             RestCatalogConfig::builder().uri(server.url()).build(),
             Some(Arc::new(LocalFsStorageFactory)),
+            Runtime::current(),
         );
 
         assert!(
@@ -1834,6 +1868,7 @@ mod tests {
         let catalog = RestCatalog::new(
             RestCatalogConfig::builder().uri(server.url()).build(),
             Some(Arc::new(LocalFsStorageFactory)),
+            Runtime::current(),
         );
 
         catalog
@@ -1874,6 +1909,7 @@ mod tests {
         let catalog = RestCatalog::new(
             RestCatalogConfig::builder().uri(server.url()).build(),
             Some(Arc::new(LocalFsStorageFactory)),
+            Runtime::current(),
         );
 
         let tables = catalog
@@ -1942,6 +1978,7 @@ mod tests {
         let catalog = RestCatalog::new(
             RestCatalogConfig::builder().uri(server.url()).build(),
             Some(Arc::new(LocalFsStorageFactory)),
+            Runtime::current(),
         );
 
         let tables = catalog
@@ -2073,6 +2110,7 @@ mod tests {
         let catalog = RestCatalog::new(
             RestCatalogConfig::builder().uri(server.url()).build(),
             Some(Arc::new(LocalFsStorageFactory)),
+            Runtime::current(),
         );
 
         let tables = catalog
@@ -2117,6 +2155,7 @@ mod tests {
         let catalog = RestCatalog::new(
             RestCatalogConfig::builder().uri(server.url()).build(),
             Some(Arc::new(LocalFsStorageFactory)),
+            Runtime::current(),
         );
 
         catalog
@@ -2146,6 +2185,7 @@ mod tests {
         let catalog = RestCatalog::new(
             RestCatalogConfig::builder().uri(server.url()).build(),
             Some(Arc::new(LocalFsStorageFactory)),
+            Runtime::current(),
         );
 
         assert!(
@@ -2177,6 +2217,7 @@ mod tests {
         let catalog = RestCatalog::new(
             RestCatalogConfig::builder().uri(server.url()).build(),
             Some(Arc::new(LocalFsStorageFactory)),
+            Runtime::current(),
         );
 
         catalog
@@ -2211,6 +2252,7 @@ mod tests {
         let catalog = RestCatalog::new(
             RestCatalogConfig::builder().uri(server.url()).build(),
             Some(Arc::new(LocalFsStorageFactory)),
+            Runtime::current(),
         );
 
         let table = catalog
@@ -2328,6 +2370,7 @@ mod tests {
         let catalog = RestCatalog::new(
             RestCatalogConfig::builder().uri(server.url()).build(),
             Some(Arc::new(LocalFsStorageFactory)),
+            Runtime::current(),
         );
 
         let table = catalog
@@ -2364,6 +2407,7 @@ mod tests {
         let catalog = RestCatalog::new(
             RestCatalogConfig::builder().uri(server.url()).build(),
             Some(Arc::new(LocalFsStorageFactory)),
+            Runtime::current(),
         );
 
         let table_creation = TableCreation::builder()
@@ -2513,6 +2557,7 @@ mod tests {
         let catalog = RestCatalog::new(
             RestCatalogConfig::builder().uri(server.url()).build(),
             Some(Arc::new(LocalFsStorageFactory)),
+            Runtime::current(),
         );
 
         let table_creation = TableCreation::builder()
@@ -2582,6 +2627,7 @@ mod tests {
         let catalog = RestCatalog::new(
             RestCatalogConfig::builder().uri(server.url()).build(),
             Some(Arc::new(LocalFsStorageFactory)),
+            Runtime::current(),
         );
 
         let table1 = {
@@ -2599,6 +2645,7 @@ mod tests {
                 .metadata_location(resp.metadata_location.unwrap())
                 .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
                 .file_io(FileIO::new_with_fs())
+                .runtime(test_runtime())
                 .build()
                 .unwrap()
         };
@@ -2725,6 +2772,7 @@ mod tests {
         let catalog = RestCatalog::new(
             RestCatalogConfig::builder().uri(server.url()).build(),
             Some(Arc::new(LocalFsStorageFactory)),
+            Runtime::current(),
         );
 
         let table1 = {
@@ -2742,6 +2790,7 @@ mod tests {
                 .metadata_location(resp.metadata_location.unwrap())
                 .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
                 .file_io(FileIO::new_with_fs())
+                .runtime(test_runtime())
                 .build()
                 .unwrap()
         };
@@ -2789,6 +2838,7 @@ mod tests {
         let catalog = RestCatalog::new(
             RestCatalogConfig::builder().uri(server.url()).build(),
             Some(Arc::new(LocalFsStorageFactory)),
+            Runtime::current(),
         );
         let table_ident =
             TableIdent::new(NamespaceIdent::new("ns1".to_string()), 
"test1".to_string());
@@ -2840,6 +2890,7 @@ mod tests {
         let catalog = RestCatalog::new(
             RestCatalogConfig::builder().uri(server.url()).build(),
             Some(Arc::new(LocalFsStorageFactory)),
+            Runtime::current(),
         );
 
         let table_ident =
diff --git a/crates/catalog/s3tables/src/catalog.rs 
b/crates/catalog/s3tables/src/catalog.rs
index cc4344694..03f4a28de 100644
--- a/crates/catalog/s3tables/src/catalog.rs
+++ b/crates/catalog/s3tables/src/catalog.rs
@@ -32,7 +32,7 @@ use iceberg::spec::{TableMetadata, TableMetadataBuilder};
 use iceberg::table::Table;
 use iceberg::{
     Catalog, CatalogBuilder, Error, ErrorKind, MetadataLocation, Namespace, 
NamespaceIdent, Result,
-    TableCommit, TableCreation, TableIdent,
+    Runtime, TableCommit, TableCreation, TableIdent,
 };
 use iceberg_storage_opendal::OpenDalStorageFactory;
 
@@ -71,6 +71,7 @@ struct S3TablesCatalogConfig {
 pub struct S3TablesCatalogBuilder {
     config: S3TablesCatalogConfig,
     storage_factory: Option<Arc<dyn StorageFactory>>,
+    runtime: Option<Runtime>,
 }
 
 /// Default builder for [`S3TablesCatalog`].
@@ -85,6 +86,7 @@ impl Default for S3TablesCatalogBuilder {
                 props: HashMap::new(),
             },
             storage_factory: None,
+            runtime: None,
         }
     }
 }
@@ -132,6 +134,11 @@ impl CatalogBuilder for S3TablesCatalogBuilder {
         self
     }
 
+    fn with_runtime(mut self, runtime: Runtime) -> Self {
+        self.runtime = Some(runtime);
+        self
+    }
+
     fn load(
         mut self,
         name: impl Into<String>,
@@ -172,7 +179,11 @@ impl CatalogBuilder for S3TablesCatalogBuilder {
                     "Table bucket ARN is required",
                 ))
             } else {
-                S3TablesCatalog::new(self.config, self.storage_factory).await
+                let runtime = match self.runtime {
+                    Some(rt) => rt,
+                    None => Runtime::try_current()?,
+                };
+                S3TablesCatalog::new(self.config, self.storage_factory, 
runtime).await
             }
         }
     }
@@ -184,6 +195,7 @@ pub struct S3TablesCatalog {
     config: S3TablesCatalogConfig,
     s3tables_client: aws_sdk_s3tables::Client,
     file_io: FileIO,
+    runtime: Runtime,
 }
 
 impl S3TablesCatalog {
@@ -191,6 +203,7 @@ impl S3TablesCatalog {
     async fn new(
         config: S3TablesCatalogConfig,
         storage_factory: Option<Arc<dyn StorageFactory>>,
+        runtime: Runtime,
     ) -> Result<Self> {
         let s3tables_client = if let Some(client) = config.client.clone() {
             client
@@ -213,6 +226,7 @@ impl S3TablesCatalog {
             config,
             s3tables_client,
             file_io,
+            runtime,
         })
     }
 
@@ -245,6 +259,7 @@ impl S3TablesCatalog {
             .metadata(metadata)
             .metadata_location(metadata_location)
             .file_io(self.file_io.clone())
+            .runtime(self.runtime.clone())
             .build()?;
         Ok((table, resp.version_token))
     }
@@ -543,6 +558,7 @@ impl Catalog for S3TablesCatalog {
             .metadata_location(metadata_location_str)
             .metadata(metadata)
             .file_io(self.file_io.clone())
+            .runtime(self.runtime.clone())
             .build()?;
         Ok(table)
     }
@@ -726,7 +742,9 @@ mod tests {
             props: HashMap::new(),
         };
 
-        Ok(Some(S3TablesCatalog::new(config, None).await?))
+        Ok(Some(
+            S3TablesCatalog::new(config, None, Runtime::current()).await?,
+        ))
     }
 
     #[tokio::test]
diff --git a/crates/catalog/sql/src/catalog.rs 
b/crates/catalog/sql/src/catalog.rs
index 7e468e7e3..c7bf9d0cf 100644
--- a/crates/catalog/sql/src/catalog.rs
+++ b/crates/catalog/sql/src/catalog.rs
@@ -26,7 +26,7 @@ use iceberg::spec::{TableMetadata, TableMetadataBuilder};
 use iceberg::table::Table;
 use iceberg::{
     Catalog, CatalogBuilder, Error, ErrorKind, MetadataLocation, Namespace, 
NamespaceIdent, Result,
-    TableCommit, TableCreation, TableIdent,
+    Runtime, TableCommit, TableCreation, TableIdent,
 };
 use sqlx::any::{AnyPoolOptions, AnyQueryResult, AnyRow, 
install_default_drivers};
 use sqlx::{Any, AnyPool, Row, Transaction};
@@ -67,6 +67,7 @@ static TEST_BEFORE_ACQUIRE: bool = true; // Default the 
health-check of each con
 pub struct SqlCatalogBuilder {
     config: SqlCatalogConfig,
     storage_factory: Option<Arc<dyn StorageFactory>>,
+    runtime: Option<Runtime>,
 }
 
 impl Default for SqlCatalogBuilder {
@@ -80,6 +81,7 @@ impl Default for SqlCatalogBuilder {
                 props: HashMap::new(),
             },
             storage_factory: None,
+            runtime: None,
         }
     }
 }
@@ -143,6 +145,11 @@ impl CatalogBuilder for SqlCatalogBuilder {
         self
     }
 
+    fn with_runtime(mut self, runtime: Runtime) -> Self {
+        self.runtime = Some(runtime);
+        self
+    }
+
     fn load(
         mut self,
         name: impl Into<String>,
@@ -190,7 +197,11 @@ impl CatalogBuilder for SqlCatalogBuilder {
                 ))
             } else {
                 self.config.name = name;
-                SqlCatalog::new(self.config, self.storage_factory).await
+                let runtime = match self.runtime {
+                    Some(rt) => rt,
+                    None => Runtime::try_current()?,
+                };
+                SqlCatalog::new(self.config, self.storage_factory, 
runtime).await
             }
         }
     }
@@ -221,6 +232,7 @@ pub struct SqlCatalog {
     warehouse_location: String,
     fileio: FileIO,
     sql_bind_style: SqlBindStyle,
+    runtime: Runtime,
 }
 
 #[derive(Debug, PartialEq, strum::EnumString, strum::Display)]
@@ -237,6 +249,7 @@ impl SqlCatalog {
     async fn new(
         config: SqlCatalogConfig,
         storage_factory: Option<Arc<dyn StorageFactory>>,
+        runtime: Runtime,
     ) -> Result<Self> {
         let factory = storage_factory.ok_or_else(|| {
             Error::new(
@@ -303,6 +316,7 @@ impl SqlCatalog {
             warehouse_location: config.warehouse_location,
             fileio,
             sql_bind_style: config.sql_bind_style,
+            runtime,
         })
     }
 
@@ -810,6 +824,7 @@ impl Catalog for SqlCatalog {
             .identifier(identifier.clone())
             .metadata_location(tbl_metadata_location)
             .metadata(metadata)
+            .runtime(self.runtime.clone())
             .build()?)
     }
 
@@ -880,6 +895,7 @@ impl Catalog for SqlCatalog {
             .metadata_location(tbl_metadata_location_str)
             .identifier(tbl_ident)
             .metadata(tbl_metadata)
+            .runtime(self.runtime.clone())
             .build()?)
     }
 
@@ -951,6 +967,7 @@ impl Catalog for SqlCatalog {
             .metadata_location(metadata_location)
             .metadata(metadata)
             .file_io(self.fileio.clone())
+            .runtime(self.runtime.clone())
             .build()?)
     }
 
diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml
index 18729176d..de34d365e 100644
--- a/crates/iceberg/Cargo.toml
+++ b/crates/iceberg/Cargo.toml
@@ -74,7 +74,7 @@ serde_json = { workspace = true }
 serde_repr = { workspace = true }
 serde_with = { workspace = true }
 strum = { workspace = true, features = ["derive"] }
-tokio = { workspace = true, optional = false, features = ["sync"] }
+tokio = { workspace = true, optional = false }
 typed-builder = { workspace = true }
 typetag = { workspace = true }
 url = { workspace = true }
diff --git a/crates/iceberg/src/arrow/caching_delete_file_loader.rs 
b/crates/iceberg/src/arrow/caching_delete_file_loader.rs
index 231971fd5..81e92fcb9 100644
--- a/crates/iceberg/src/arrow/caching_delete_file_loader.rs
+++ b/crates/iceberg/src/arrow/caching_delete_file_loader.rs
@@ -31,6 +31,7 @@ use crate::delete_vector::DeleteVector;
 use crate::expr::Predicate::AlwaysTrue;
 use crate::expr::{Predicate, Reference};
 use crate::io::FileIO;
+use crate::runtime::Runtime;
 use crate::scan::{ArrowRecordBatchStream, FileScanTaskDeleteFile};
 use crate::spec::{
     DataContentType, Datum, ListType, MapType, NestedField, NestedFieldRef, 
PartnerAccessor,
@@ -46,6 +47,7 @@ pub(crate) struct CachingDeleteFileLoader {
     /// Shared filter state to allow caching loaded deletes across multiple
     /// calls to `load_deletes` (e.g., across multiple file scan tasks).
     delete_filter: DeleteFilter,
+    runtime: Runtime,
 }
 
 // Intermediate context during processing of a delete file task.
@@ -77,12 +79,17 @@ enum ParsedDeleteFileContext {
 
 #[allow(unused_variables)]
 impl CachingDeleteFileLoader {
-    pub(crate) fn new(file_io: FileIO, concurrency_limit_data_files: usize) -> 
Self {
+    pub(crate) fn new(
+        file_io: FileIO,
+        concurrency_limit_data_files: usize,
+        runtime: Runtime,
+    ) -> Self {
         let scan_metrics = ScanMetrics::new();
         CachingDeleteFileLoader {
             basic_delete_file_loader: BasicDeleteFileLoader::new(file_io, 
scan_metrics),
             concurrency_limit_data_files,
-            delete_filter: DeleteFilter::default(),
+            delete_filter: DeleteFilter::new(runtime.clone()),
+            runtime,
         }
     }
 
@@ -181,7 +188,7 @@ impl CachingDeleteFileLoader {
         let del_filter = self.delete_filter.clone();
         let concurrency_limit_data_files = self.concurrency_limit_data_files;
         let basic_delete_file_loader = self.basic_delete_file_loader.clone();
-        crate::runtime::spawn(async move {
+        self.runtime.io().spawn(async move {
             let result = async move {
                 let mut del_filter = del_filter;
                 let basic_delete_file_loader = 
basic_delete_file_loader.clone();
@@ -737,7 +744,8 @@ mod tests {
         let table_location = tmp_dir.path();
         let file_io = FileIO::new_with_fs();
 
-        let delete_file_loader = CachingDeleteFileLoader::new(file_io.clone(), 
10);
+        let delete_file_loader =
+            CachingDeleteFileLoader::new(file_io.clone(), 10, 
Runtime::current());
 
         let file_scan_tasks = setup(table_location);
 
@@ -958,7 +966,8 @@ mod tests {
         };
 
         // Load the deletes - should handle both types without error
-        let delete_file_loader = CachingDeleteFileLoader::new(file_io.clone(), 
10);
+        let delete_file_loader =
+            CachingDeleteFileLoader::new(file_io.clone(), 10, 
Runtime::current());
         let delete_filter = delete_file_loader
             .load_deletes(&file_scan_task.deletes, file_scan_task.schema_ref())
             .await
@@ -1030,7 +1039,8 @@ mod tests {
         let table_location = tmp_dir.path();
         let file_io = FileIO::new_with_fs();
 
-        let delete_file_loader = CachingDeleteFileLoader::new(file_io.clone(), 
10);
+        let delete_file_loader =
+            CachingDeleteFileLoader::new(file_io.clone(), 10, 
Runtime::current());
 
         let file_scan_tasks = setup(table_location);
 
diff --git a/crates/iceberg/src/arrow/delete_filter.rs 
b/crates/iceberg/src/arrow/delete_filter.rs
index 6369938ce..2dabc11ca 100644
--- a/crates/iceberg/src/arrow/delete_filter.rs
+++ b/crates/iceberg/src/arrow/delete_filter.rs
@@ -24,6 +24,7 @@ use tokio::sync::oneshot::Receiver;
 use crate::delete_vector::DeleteVector;
 use crate::expr::Predicate::AlwaysTrue;
 use crate::expr::{Bind, BoundPredicate, Predicate};
+use crate::runtime::Runtime;
 use crate::scan::{FileScanTask, FileScanTaskDeleteFile};
 use crate::spec::DataContentType;
 use crate::{Error, ErrorKind, Result};
@@ -53,9 +54,10 @@ struct DeleteFileFilterState {
     positional_deletes: HashMap<String, PosDelState>,
 }
 
-#[derive(Clone, Debug, Default)]
+#[derive(Clone, Debug)]
 pub(crate) struct DeleteFilter {
     state: Arc<RwLock<DeleteFileFilterState>>,
+    runtime: Runtime,
 }
 
 /// Action to take when trying to start loading a positional delete file
@@ -71,6 +73,14 @@ pub(crate) enum PosDelLoadAction {
 }
 
 impl DeleteFilter {
+    /// Create a new DeleteFilter with the given runtime.
+    pub(crate) fn new(runtime: Runtime) -> Self {
+        Self {
+            state: Arc::new(RwLock::new(DeleteFileFilterState::default())),
+            runtime,
+        }
+    }
+
     /// Retrieve a delete vector for the data file associated with a given 
file scan task
     pub(crate) fn get_delete_vector(
         &self,
@@ -245,7 +255,7 @@ impl DeleteFilter {
 
         let state = self.state.clone();
         let delete_file_path = delete_file_path.to_string();
-        crate::runtime::spawn(async move {
+        self.runtime.cpu().spawn(async move {
             let eq_del = eq_del.await.unwrap();
             {
                 let mut state = state.write().unwrap();
@@ -292,7 +302,8 @@ pub(crate) mod tests {
         let table_location = tmp_dir.path();
         let file_io = FileIO::new_with_fs();
 
-        let delete_file_loader = CachingDeleteFileLoader::new(file_io.clone(), 
10);
+        let delete_file_loader =
+            CachingDeleteFileLoader::new(file_io.clone(), 10, 
Runtime::current());
 
         let file_scan_tasks = setup(table_location);
 
@@ -503,7 +514,7 @@ pub(crate) mod tests {
             case_sensitive: true,
         };
 
-        let filter = DeleteFilter::default();
+        let filter = DeleteFilter::new(Runtime::current());
 
         // ---------- insert equality delete predicate ----------
         let pred = Reference::new("id").equal_to(Datum::long(10));
diff --git a/crates/iceberg/src/arrow/reader/mod.rs 
b/crates/iceberg/src/arrow/reader/mod.rs
index c6c41accb..7e3c2f807 100644
--- a/crates/iceberg/src/arrow/reader/mod.rs
+++ b/crates/iceberg/src/arrow/reader/mod.rs
@@ -19,6 +19,7 @@
 
 use crate::arrow::caching_delete_file_loader::CachingDeleteFileLoader;
 use crate::io::FileIO;
+use crate::runtime::Runtime;
 use crate::util::available_parallelism;
 
 /// Default gap between byte ranges below which they are coalesced into a
@@ -53,11 +54,12 @@ pub struct ArrowReaderBuilder {
     row_group_filtering_enabled: bool,
     row_selection_enabled: bool,
     parquet_read_options: ParquetReadOptions,
+    runtime: Runtime,
 }
 
 impl ArrowReaderBuilder {
     /// Create a new ArrowReaderBuilder
-    pub fn new(file_io: FileIO) -> Self {
+    pub fn new(file_io: FileIO, runtime: Runtime) -> Self {
         let num_cpus = available_parallelism().get();
 
         ArrowReaderBuilder {
@@ -67,6 +69,7 @@ impl ArrowReaderBuilder {
             row_group_filtering_enabled: true,
             row_selection_enabled: false,
             parquet_read_options: ParquetReadOptions::builder().build(),
+            runtime,
         }
     }
 
@@ -129,6 +132,7 @@ impl ArrowReaderBuilder {
             delete_file_loader: CachingDeleteFileLoader::new(
                 self.file_io.clone(),
                 self.concurrency_limit_data_files,
+                self.runtime.clone(),
             ),
             concurrency_limit_data_files: self.concurrency_limit_data_files,
             row_group_filtering_enabled: self.row_group_filtering_enabled,
diff --git a/crates/iceberg/src/arrow/reader/pipeline.rs 
b/crates/iceberg/src/arrow/reader/pipeline.rs
index 8ecee294c..edc0a3fa9 100644
--- a/crates/iceberg/src/arrow/reader/pipeline.rs
+++ b/crates/iceberg/src/arrow/reader/pipeline.rs
@@ -456,6 +456,7 @@ mod tests {
     use parquet::file::properties::WriterProperties;
     use tempfile::TempDir;
 
+    use crate::Runtime;
     use crate::arrow::ArrowReaderBuilder;
     use crate::io::FileIO;
     use crate::scan::{FileScanTask, FileScanTaskStream};
@@ -487,7 +488,7 @@ mod tests {
         project_field_ids: Vec<i32>,
     ) -> Vec<RecordBatch> {
         let file_io = FileIO::new_with_fs();
-        let reader = ArrowReaderBuilder::new(file_io).build();
+        let reader = ArrowReaderBuilder::new(file_io, 
Runtime::current()).build();
 
         let file_size = std::fs::metadata(file_path).unwrap().len();
         let task = FileScanTask {
@@ -698,7 +699,7 @@ mod tests {
         }
 
         // Read with concurrency=1 (fast-path)
-        let reader = ArrowReaderBuilder::new(file_io)
+        let reader = ArrowReaderBuilder::new(file_io, Runtime::current())
             .with_data_file_concurrency_limit(1)
             .build();
 
diff --git a/crates/iceberg/src/arrow/reader/positional_deletes.rs 
b/crates/iceberg/src/arrow/reader/positional_deletes.rs
index b2993572c..6cba80534 100644
--- a/crates/iceberg/src/arrow/reader/positional_deletes.rs
+++ b/crates/iceberg/src/arrow/reader/positional_deletes.rs
@@ -166,6 +166,7 @@ mod tests {
     use roaring::RoaringTreemap;
     use tempfile::TempDir;
 
+    use crate::Runtime;
     use crate::arrow::{ArrowReader, ArrowReaderBuilder};
     use crate::delete_vector::DeleteVector;
     use crate::io::FileIO;
@@ -432,7 +433,7 @@ mod tests {
 
         // Step 3: Read the data file with the delete applied
         let file_io = FileIO::new_with_fs();
-        let reader = ArrowReaderBuilder::new(file_io).build();
+        let reader = ArrowReaderBuilder::new(file_io, 
Runtime::current()).build();
 
         let task = FileScanTask {
             file_size_in_bytes: 
std::fs::metadata(&data_file_path).unwrap().len(),
@@ -652,7 +653,7 @@ mod tests {
         );
 
         let file_io = FileIO::new_with_fs();
-        let reader = ArrowReaderBuilder::new(file_io).build();
+        let reader = ArrowReaderBuilder::new(file_io, 
Runtime::current()).build();
 
         // Create FileScanTask that reads ONLY row group 1 via byte range 
filtering
         let task = FileScanTask {
@@ -867,7 +868,7 @@ mod tests {
         let rg1_length = row_group_1.compressed_size() as u64;
 
         let file_io = FileIO::new_with_fs();
-        let reader = ArrowReaderBuilder::new(file_io).build();
+        let reader = ArrowReaderBuilder::new(file_io, 
Runtime::current()).build();
 
         // Create FileScanTask that reads ONLY row group 1 via byte range 
filtering
         let task = FileScanTask {
diff --git a/crates/iceberg/src/arrow/reader/projection.rs 
b/crates/iceberg/src/arrow/reader/projection.rs
index deae027e1..2589c7836 100644
--- a/crates/iceberg/src/arrow/reader/projection.rs
+++ b/crates/iceberg/src/arrow/reader/projection.rs
@@ -431,12 +431,12 @@ mod tests {
     use parquet::schema::types::SchemaDescriptor;
     use tempfile::TempDir;
 
-    use crate::ErrorKind;
     use crate::arrow::{ArrowReader, ArrowReaderBuilder};
     use crate::expr::{Bind, Reference};
     use crate::io::FileIO;
     use crate::scan::{FileScanTask, FileScanTaskStream};
     use crate::spec::{DataFileFormat, Datum, NestedField, PrimitiveType, 
Schema, Type};
+    use crate::{ErrorKind, Runtime};
 
     #[test]
     fn test_arrow_projection_mask() {
@@ -576,7 +576,7 @@ message schema {
         writer.close().unwrap();
 
         // Read the old Parquet file using the NEW schema (with column 'b')
-        let reader = ArrowReaderBuilder::new(file_io).build();
+        let reader = ArrowReaderBuilder::new(file_io, 
Runtime::current()).build();
         let tasks = Box::pin(futures::stream::iter(
             vec![Ok(FileScanTask {
                 file_size_in_bytes: 
std::fs::metadata(format!("{table_location}/old_file.parquet"))
@@ -678,7 +678,7 @@ message schema {
         writer.write(&to_write).expect("Writing batch");
         writer.close().unwrap();
 
-        let reader = ArrowReaderBuilder::new(file_io).build();
+        let reader = ArrowReaderBuilder::new(file_io, 
Runtime::current()).build();
 
         let tasks = Box::pin(futures::stream::iter(
             vec![Ok(FileScanTask {
@@ -780,7 +780,7 @@ message schema {
         writer.write(&to_write).expect("Writing batch");
         writer.close().unwrap();
 
-        let reader = ArrowReaderBuilder::new(file_io).build();
+        let reader = ArrowReaderBuilder::new(file_io, 
Runtime::current()).build();
 
         let tasks = Box::pin(futures::stream::iter(
             vec![Ok(FileScanTask {
@@ -871,7 +871,7 @@ message schema {
         writer.write(&to_write).expect("Writing batch");
         writer.close().unwrap();
 
-        let reader = ArrowReaderBuilder::new(file_io).build();
+        let reader = ArrowReaderBuilder::new(file_io, 
Runtime::current()).build();
 
         let tasks = Box::pin(futures::stream::iter(
             vec![Ok(FileScanTask {
@@ -976,7 +976,7 @@ message schema {
         }
         writer.close().unwrap();
 
-        let reader = ArrowReaderBuilder::new(file_io).build();
+        let reader = ArrowReaderBuilder::new(file_io, 
Runtime::current()).build();
 
         let tasks = Box::pin(futures::stream::iter(
             vec![Ok(FileScanTask {
@@ -1110,7 +1110,7 @@ message schema {
         writer.write(&to_write).expect("Writing batch");
         writer.close().unwrap();
 
-        let reader = ArrowReaderBuilder::new(file_io).build();
+        let reader = ArrowReaderBuilder::new(file_io, 
Runtime::current()).build();
 
         let tasks = Box::pin(futures::stream::iter(
             vec![Ok(FileScanTask {
@@ -1211,7 +1211,7 @@ message schema {
         writer.write(&to_write).expect("Writing batch");
         writer.close().unwrap();
 
-        let reader = ArrowReaderBuilder::new(file_io).build();
+        let reader = ArrowReaderBuilder::new(file_io, 
Runtime::current()).build();
 
         let tasks = Box::pin(futures::stream::iter(
             vec![Ok(FileScanTask {
@@ -1322,7 +1322,7 @@ message schema {
         let predicate = Reference::new("id").less_than(Datum::int(5));
 
         // Enable both row_group_filtering and row_selection - triggered the 
panic
-        let reader = ArrowReaderBuilder::new(file_io)
+        let reader = ArrowReaderBuilder::new(file_io, Runtime::current())
             .with_row_group_filtering_enabled(true)
             .with_row_selection_enabled(true)
             .build();
@@ -1470,7 +1470,7 @@ message schema {
         writer.close().unwrap();
 
         // Read the Parquet file with partition spec and data
-        let reader = ArrowReaderBuilder::new(file_io).build();
+        let reader = ArrowReaderBuilder::new(file_io, 
Runtime::current()).build();
         let tasks = Box::pin(futures::stream::iter(
             vec![Ok(FileScanTask {
                 file_size_in_bytes: 
std::fs::metadata(format!("{table_location}/data.parquet"))
@@ -1680,7 +1680,7 @@ message schema {
 
         let predicate = Reference::new("id").greater_than(Datum::int(1));
 
-        let reader = ArrowReaderBuilder::new(FileIO::new_with_fs())
+        let reader = ArrowReaderBuilder::new(FileIO::new_with_fs(), 
Runtime::current())
             .with_row_group_filtering_enabled(true)
             .with_row_selection_enabled(true)
             .build();
diff --git a/crates/iceberg/src/arrow/reader/row_filter.rs 
b/crates/iceberg/src/arrow/reader/row_filter.rs
index 80432a043..bc5a05e47 100644
--- a/crates/iceberg/src/arrow/reader/row_filter.rs
+++ b/crates/iceberg/src/arrow/reader/row_filter.rs
@@ -204,6 +204,7 @@ mod tests {
     use parquet::file::properties::WriterProperties;
     use tempfile::TempDir;
 
+    use crate::Runtime;
     use crate::arrow::{ArrowReader, ArrowReaderBuilder};
     use crate::expr::{Bind, Predicate, Reference};
     use crate::io::FileIO;
@@ -320,7 +321,7 @@ mod tests {
 
         let (file_io, schema, table_location, _temp_dir) =
             setup_kleene_logic(data_for_col_a, DataType::Utf8);
-        let reader = ArrowReaderBuilder::new(file_io).build();
+        let reader = ArrowReaderBuilder::new(file_io, 
Runtime::current()).build();
 
         let result_data = test_perform_read(predicate, schema, table_location, 
reader).await;
 
@@ -342,7 +343,7 @@ mod tests {
 
         let (file_io, schema, table_location, _temp_dir) =
             setup_kleene_logic(data_for_col_a, DataType::Utf8);
-        let reader = ArrowReaderBuilder::new(file_io).build();
+        let reader = ArrowReaderBuilder::new(file_io, 
Runtime::current()).build();
 
         let result_data = test_perform_read(predicate, schema, table_location, 
reader).await;
 
@@ -406,7 +407,7 @@ mod tests {
 
         let (file_io, schema, table_location, _temp_dir) =
             setup_kleene_logic(data_for_col_a, DataType::LargeUtf8);
-        let reader = ArrowReaderBuilder::new(file_io).build();
+        let reader = ArrowReaderBuilder::new(file_io, 
Runtime::current()).build();
 
         for (predicate, expected) in predicates {
             println!("testing predicate {predicate}");
@@ -513,7 +514,7 @@ mod tests {
         );
 
         let file_io = FileIO::new_with_fs();
-        let reader = ArrowReaderBuilder::new(file_io).build();
+        let reader = ArrowReaderBuilder::new(file_io, 
Runtime::current()).build();
 
         // Task 1: read only the first row group
         let task1 = FileScanTask {
diff --git a/crates/iceberg/src/catalog/memory/catalog.rs 
b/crates/iceberg/src/catalog/memory/catalog.rs
index 868f1f3bc..3ae01a23d 100644
--- a/crates/iceberg/src/catalog/memory/catalog.rs
+++ b/crates/iceberg/src/catalog/memory/catalog.rs
@@ -27,6 +27,7 @@ use itertools::Itertools;
 
 use super::namespace_state::NamespaceState;
 use crate::io::{FileIO, FileIOBuilder, MemoryStorageFactory, StorageFactory};
+use crate::runtime::Runtime;
 use crate::spec::{TableMetadata, TableMetadataBuilder};
 use crate::table::Table;
 use crate::{
@@ -45,6 +46,7 @@ const LOCATION: &str = "location";
 pub struct MemoryCatalogBuilder {
     config: MemoryCatalogConfig,
     storage_factory: Option<Arc<dyn StorageFactory>>,
+    runtime: Option<Runtime>,
 }
 
 impl Default for MemoryCatalogBuilder {
@@ -56,6 +58,7 @@ impl Default for MemoryCatalogBuilder {
                 props: HashMap::new(),
             },
             storage_factory: None,
+            runtime: None,
         }
     }
 }
@@ -68,6 +71,11 @@ impl CatalogBuilder for MemoryCatalogBuilder {
         self
     }
 
+    fn with_runtime(mut self, runtime: Runtime) -> Self {
+        self.runtime = Some(runtime);
+        self
+    }
+
     fn load(
         mut self,
         name: impl Into<String>,
@@ -100,7 +108,8 @@ impl CatalogBuilder for MemoryCatalogBuilder {
                     "Catalog warehouse is required",
                 ))
             } else {
-                MemoryCatalog::new(self.config, self.storage_factory)
+                let runtime = self.runtime.unwrap_or_else(Runtime::current);
+                MemoryCatalog::new(self.config, self.storage_factory, runtime)
             }
         };
 
@@ -121,6 +130,7 @@ pub struct MemoryCatalog {
     root_namespace_state: Mutex<NamespaceState>,
     file_io: FileIO,
     warehouse_location: String,
+    runtime: Runtime,
 }
 
 impl MemoryCatalog {
@@ -128,6 +138,7 @@ impl MemoryCatalog {
     fn new(
         config: MemoryCatalogConfig,
         storage_factory: Option<Arc<dyn StorageFactory>>,
+        runtime: Runtime,
     ) -> Result<Self> {
         // Use provided factory or default to MemoryStorageFactory
         let factory = storage_factory.unwrap_or_else(|| 
Arc::new(MemoryStorageFactory));
@@ -136,6 +147,7 @@ impl MemoryCatalog {
             root_namespace_state: Mutex::new(NamespaceState::default()),
             file_io: 
FileIOBuilder::new(factory).with_props(config.props).build(),
             warehouse_location: config.warehouse,
+            runtime,
         })
     }
 
@@ -153,6 +165,7 @@ impl MemoryCatalog {
             .metadata(metadata)
             .metadata_location(metadata_location.to_string())
             .file_io(self.file_io.clone())
+            .runtime(self.runtime.clone())
             .build()
     }
 }
@@ -307,6 +320,7 @@ impl Catalog for MemoryCatalog {
             .metadata_location(metadata_location.to_string())
             .metadata(metadata)
             .identifier(table_ident)
+            .runtime(self.runtime.clone())
             .build()
     }
 
@@ -378,6 +392,7 @@ impl Catalog for MemoryCatalog {
             .metadata_location(metadata_location)
             .metadata(metadata)
             .identifier(table_ident.clone())
+            .runtime(self.runtime.clone())
             .build()
     }
 
@@ -420,6 +435,7 @@ pub(crate) mod tests {
     use super::*;
     use crate::io::FileIO;
     use crate::spec::{NestedField, PartitionSpec, PrimitiveType, Schema, 
SortOrder, Type};
+    use crate::test_utils::test_runtime;
     use crate::transaction::{ApplyTransactionAction, Transaction};
 
     fn temp_path() -> String {
@@ -1870,6 +1886,11 @@ pub(crate) mod tests {
         // Assert the table doesn't contain the update yet
         assert!(!table.metadata().properties().contains_key("key"));
 
+        // `last_updated_ms` is millisecond-resolution `chrono::Utc::now()`.
+        // Without this sleep, create and commit can land in the same
+        // millisecond and the `<` assertion below flakes.
+        tokio::time::sleep(std::time::Duration::from_millis(2)).await;
+
         // Update table metadata
         let tx = Transaction::new(&table);
         let updated_table = tx
@@ -1939,6 +1960,7 @@ pub(crate) mod tests {
             .identifier(ident)
             .metadata(metadata)
             .file_io(file_io)
+            .runtime(test_runtime())
             .build()
             .unwrap()
     }
diff --git a/crates/iceberg/src/catalog/mod.rs 
b/crates/iceberg/src/catalog/mod.rs
index 51daa5059..3ab0f2886 100644
--- a/crates/iceberg/src/catalog/mod.rs
+++ b/crates/iceberg/src/catalog/mod.rs
@@ -40,6 +40,7 @@ use typed_builder::TypedBuilder;
 use uuid::Uuid;
 
 use crate::io::StorageFactory;
+use crate::runtime::Runtime;
 use crate::spec::{
     EncryptedKey, FormatVersion, PartitionStatisticsFile, Schema, SchemaId, 
Snapshot,
     SnapshotReference, SortOrder, StatisticsFile, TableMetadata, 
TableMetadataBuilder,
@@ -151,6 +152,13 @@ pub trait CatalogBuilder: Default + Debug + Send + Sync {
     /// ```
     fn with_storage_factory(self, storage_factory: Arc<dyn StorageFactory>) -> 
Self;
 
+    /// Set a custom tokio Runtime to use for spawning async tasks.
+    ///
+    /// When a Runtime is provided, the catalog will propagate it to all tables
+    /// it creates. Tasks such as scan planning and delete file processing
+    /// will be spawned on this runtime.
+    fn with_runtime(self, runtime: Runtime) -> Self;
+
     /// Create a new catalog instance.
     fn load(
         self,
@@ -1076,6 +1084,7 @@ mod tests {
         ViewVersion,
     };
     use crate::table::Table;
+    use crate::test_utils::test_runtime;
     use crate::{
         NamespaceIdent, TableCommit, TableCreation, TableIdent, 
TableRequirement, TableUpdate,
     };
@@ -2393,6 +2402,7 @@ mod tests {
                 
.metadata_location("s3://bucket/test/location/metadata/00000-8a62c37d-4573-4021-952a-c0baef7d21d0.metadata.json")
                 .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
                 .file_io(FileIO::new_with_memory())
+                .runtime(test_runtime())
                 .build()
                 .unwrap()
         };
diff --git a/crates/iceberg/src/delete_file_index.rs 
b/crates/iceberg/src/delete_file_index.rs
index 4f6fd2848..07394bb85 100644
--- a/crates/iceberg/src/delete_file_index.rs
+++ b/crates/iceberg/src/delete_file_index.rs
@@ -23,7 +23,7 @@ use futures::StreamExt;
 use futures::channel::mpsc::{Sender, channel};
 use tokio::sync::Notify;
 
-use crate::runtime::spawn;
+use crate::runtime::Runtime;
 use crate::scan::{DeleteFileContext, FileScanTaskDeleteFile};
 use crate::spec::{DataContentType, DataFile, Struct};
 
@@ -53,7 +53,7 @@ struct PopulatedDeleteFileIndex {
 
 impl DeleteFileIndex {
     /// create a new `DeleteFileIndex` along with the sender that populates it 
with delete files
-    pub(crate) fn new() -> (DeleteFileIndex, Sender<DeleteFileContext>) {
+    pub(crate) fn new(runtime: Runtime) -> (DeleteFileIndex, 
Sender<DeleteFileContext>) {
         // TODO: what should the channel limit be?
         let (tx, rx) = channel(10);
         let notify = Arc::new(Notify::new());
@@ -62,7 +62,7 @@ impl DeleteFileIndex {
         )));
         let delete_file_stream = rx.boxed();
 
-        spawn({
+        runtime.io().spawn({
             let state = state.clone();
             async move {
                 let delete_files: Vec<DeleteFileContext> =
diff --git a/crates/iceberg/src/io/object_cache.rs 
b/crates/iceberg/src/io/object_cache.rs
index 8881471ae..5de45e2ac 100644
--- a/crates/iceberg/src/io/object_cache.rs
+++ b/crates/iceberg/src/io/object_cache.rs
@@ -198,6 +198,7 @@ mod tests {
         ManifestListWriter, ManifestStatus, ManifestWriterBuilder, Struct, 
TableMetadata,
     };
     use crate::table::Table;
+    use crate::test_utils::test_runtime;
 
     fn render_template(template: &str, ctx: Value) -> String {
         let mut env = Environment::new();
@@ -240,6 +241,7 @@ mod tests {
                 .identifier(TableIdent::from_strs(["db", "table1"]).unwrap())
                 .file_io(file_io.clone())
                 
.metadata_location(table_metadata1_location.as_os_str().to_str().unwrap())
+                .runtime(test_runtime())
                 .build()
                 .unwrap();
 
diff --git a/crates/iceberg/src/lib.rs b/crates/iceberg/src/lib.rs
index ae0708146..4e346460f 100644
--- a/crates/iceberg/src/lib.rs
+++ b/crates/iceberg/src/lib.rs
@@ -90,6 +90,7 @@ pub mod transaction;
 pub mod transform;
 
 mod runtime;
+pub use runtime::{Runtime, RuntimeHandle};
 
 pub mod arrow;
 pub(crate) mod delete_file_index;
diff --git a/crates/iceberg/src/runtime/mod.rs 
b/crates/iceberg/src/runtime/mod.rs
index 61aa623f5..c04aef330 100644
--- a/crates/iceberg/src/runtime/mod.rs
+++ b/crates/iceberg/src/runtime/mod.rs
@@ -17,59 +17,296 @@
 
 // This module contains the async runtime abstraction for iceberg.
 
+use std::fmt;
 use std::future::Future;
 use std::pin::Pin;
 use std::task::{Context, Poll};
 
 use tokio::task;
 
+use crate::{Error, ErrorKind, Result};
+
+/// Wrapper around tokio's `JoinHandle` that converts task failures into
+/// [`iceberg::Error`].
+///
+/// Tokio's `JoinHandle<T>` resolves to `Result<T, JoinError>`, where a
+/// `JoinError` means the task either panicked or was cancelled (typically from
+/// runtime shutdown or `abort`). Both are surfaced here as
+/// `ErrorKind::Unexpected` with the original `JoinError` preserved as the
+/// source.
 pub struct JoinHandle<T>(task::JoinHandle<T>);
 
 impl<T> Unpin for JoinHandle<T> {}
 
 impl<T: Send + 'static> Future for JoinHandle<T> {
-    type Output = T;
+    type Output = crate::Result<T>;
 
     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
-        match self.get_mut() {
-            JoinHandle(handle) => Pin::new(handle)
-                .poll(cx)
-                .map(|r| r.expect("tokio spawned task failed")),
-        }
+        Pin::new(&mut self.get_mut().0).poll(cx).map(|r| {
+            r.map_err(|e| Error::new(ErrorKind::Unexpected, "spawned task 
failed").with_source(e))
+        })
+    }
+}
+
+/// Handle to a single tokio runtime.
+///
+/// Wraps a [`tokio::runtime::Handle`], which is cheap to clone. The caller is
+/// responsible for keeping the underlying runtime alive while this handle is
+/// in use; spawning on a shut-down runtime will surface as a `JoinError` via
+/// [`JoinHandle`].
+#[derive(Clone)]
+pub struct RuntimeHandle {
+    handle: tokio::runtime::Handle,
+}
+
+impl fmt::Debug for RuntimeHandle {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        f.debug_struct("RuntimeHandle").finish()
+    }
+}
+
+impl RuntimeHandle {
+    fn from_tokio_handle(handle: tokio::runtime::Handle) -> Self {
+        Self { handle }
+    }
+
+    /// Spawn an async task.
+    pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
+    where
+        F: Future + Send + 'static,
+        F::Output: Send + 'static,
+    {
+        JoinHandle(self.handle.spawn(future))
+    }
+
+    /// Spawn a blocking task.
+    pub fn spawn_blocking<F, T>(&self, f: F) -> JoinHandle<T>
+    where
+        F: FnOnce() -> T + Send + 'static,
+        T: Send + 'static,
+    {
+        JoinHandle(self.handle.spawn_blocking(f))
     }
 }
 
-#[allow(dead_code)]
-pub fn spawn<F>(f: F) -> JoinHandle<F::Output>
-where
-    F: std::future::Future + Send + 'static,
-    F::Output: Send + 'static,
-{
-    JoinHandle(task::spawn(f))
+/// Iceberg's runtime abstraction.
+///
+/// Contains separate handles for IO-bound and CPU-bound work. When constructed
+/// with a single tokio runtime, both `io()` and `cpu()` route to the same one.
+/// Use [`Runtime::new_with_split`] to provide dedicated runtimes for each
+/// category.
+///
+/// # Lifetime
+///
+/// A `Runtime` stores only `tokio::runtime::Handle`s (weak references). The
+/// caller owns the tokio runtime's lifetime. If the underlying runtime is
+/// dropped while iceberg is still using it, subsequent spawns will surface as
+/// task cancellation errors via [`JoinHandle`].
+///
+/// Cloning is cheap.
+#[derive(Clone)]
+pub struct Runtime {
+    io: RuntimeHandle,
+    cpu: RuntimeHandle,
 }
 
-#[allow(dead_code)]
-pub fn spawn_blocking<F, T>(f: F) -> JoinHandle<T>
-where
-    F: FnOnce() -> T + Send + 'static,
-    T: Send + 'static,
-{
-    JoinHandle(task::spawn_blocking(f))
+impl fmt::Debug for Runtime {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        f.debug_struct("Runtime").finish()
+    }
+}
+
+impl Runtime {
+    /// Create a Runtime backed by a single tokio runtime for all work.
+    pub fn new(runtime: &tokio::runtime::Runtime) -> Self {
+        let handle = 
RuntimeHandle::from_tokio_handle(runtime.handle().clone());
+        Self {
+            io: handle.clone(),
+            cpu: handle,
+        }
+    }
+
+    /// Create a Runtime with separate tokio runtimes for IO and CPU work.
+    pub fn new_with_split(
+        io_runtime: &tokio::runtime::Runtime,
+        cpu_runtime: &tokio::runtime::Runtime,
+    ) -> Self {
+        Self {
+            io: RuntimeHandle::from_tokio_handle(io_runtime.handle().clone()),
+            cpu: 
RuntimeHandle::from_tokio_handle(cpu_runtime.handle().clone()),
+        }
+    }
+
+    /// Borrows the tokio runtime the caller is currently running in.
+    ///
+    /// Panics if called outside a tokio runtime context. Use
+    /// [`Runtime::try_current`] for a fallible version.
+    ///
+    /// Iceberg never implicitly spawns its own runtime; callers outside a
+    /// tokio context must construct one explicitly via [`Runtime::new`] or
+    /// [`Runtime::new_with_split`].
+    pub fn current() -> Self {
+        Self::try_current().expect(
+            "Runtime::current() called outside a tokio runtime context. \
+             Call it from within #[tokio::main] / #[tokio::test], or construct 
\
+             a Runtime explicitly via Runtime::new / Runtime::new_with_split.",
+        )
+    }
+
+    /// Fallible variant of [`Runtime::current`]. Returns an error if no tokio
+    /// runtime is available in the current context.
+    pub fn try_current() -> Result<Self> {
+        let handle = tokio::runtime::Handle::try_current().map_err(|e| {
+            Error::new(
+                ErrorKind::Unexpected,
+                "no tokio runtime in context; call Runtime::try_current() \
+                 from within a tokio runtime, or construct a Runtime 
explicitly \
+                 via Runtime::new / Runtime::new_with_split",
+            )
+            .with_source(e)
+        })?;
+        let rh = RuntimeHandle::from_tokio_handle(handle);
+        Ok(Self {
+            io: rh.clone(),
+            cpu: rh,
+        })
+    }
+
+    /// Handle for IO-bound work (network fetches, file reads).
+    pub fn io(&self) -> &RuntimeHandle {
+        &self.io
+    }
+
+    /// Handle for CPU-bound work (decoding, predicate eval, projection).
+    pub fn cpu(&self) -> &RuntimeHandle {
+        &self.cpu
+    }
 }
 
 #[cfg(test)]
 mod tests {
     use super::*;
 
-    #[tokio::test]
-    async fn test_tokio_spawn() {
-        let handle = spawn(async { 1 + 1 });
-        assert_eq!(handle.await, 2);
+    /// A test harness that owns a tokio runtime and exposes a `Runtime` handle
+    /// plus a `block_on` helper for sync test bodies.
+    struct TestRuntime {
+        tokio: tokio::runtime::Runtime,
+        rt: Runtime,
+    }
+
+    impl TestRuntime {
+        fn new() -> Self {
+            let tokio = tokio::runtime::Builder::new_multi_thread()
+                .enable_all()
+                .build()
+                .expect("Failed to build tokio runtime");
+            let rt = Runtime::new(&tokio);
+            Self { tokio, rt }
+        }
+
+        fn block_on<F: Future>(&self, f: F) -> F::Output {
+            self.tokio.block_on(f)
+        }
+    }
+
+    #[test]
+    fn test_runtime_spawn_io() {
+        let h = TestRuntime::new();
+        let handle = h.rt.io().spawn(async { 1 + 1 });
+        assert_eq!(h.block_on(handle).unwrap(), 2);
+    }
+
+    #[test]
+    fn test_runtime_spawn_cpu() {
+        let h = TestRuntime::new();
+        let handle = h.rt.cpu().spawn(async { 3 + 4 });
+        assert_eq!(h.block_on(handle).unwrap(), 7);
+    }
+
+    #[test]
+    fn test_runtime_spawn_blocking() {
+        let h = TestRuntime::new();
+        let handle = h.rt.cpu().spawn_blocking(|| 1 + 1);
+        assert_eq!(h.block_on(handle).unwrap(), 2);
     }
 
-    #[tokio::test]
-    async fn test_tokio_spawn_blocking() {
-        let handle = spawn_blocking(|| 1 + 1);
-        assert_eq!(handle.await, 2);
+    #[test]
+    fn test_runtime_new_with_custom_runtime() {
+        let h = TestRuntime::new();
+        let handle = h.rt.io().spawn(async { 42 });
+        assert_eq!(h.block_on(handle).unwrap(), 42);
+    }
+
+    #[test]
+    fn test_runtime_split_uses_separate_handles() {
+        let io_rt = tokio::runtime::Builder::new_multi_thread()
+            .enable_all()
+            .build()
+            .unwrap();
+        let cpu_rt = tokio::runtime::Builder::new_multi_thread()
+            .enable_all()
+            .build()
+            .unwrap();
+        let rt = Runtime::new_with_split(&io_rt, &cpu_rt);
+        // Spawn on each and confirm both are distinct live runtimes. We use
+        // `io_rt`/`cpu_rt` directly to `block_on` since our `Runtime` doesn't
+        // expose one.
+        let io_result = io_rt.block_on(async { rt.io().spawn(async { "io" 
}).await.unwrap() });
+        let cpu_result = cpu_rt.block_on(async { rt.cpu().spawn(async { "cpu" 
}).await.unwrap() });
+        assert_eq!(io_result, "io");
+        assert_eq!(cpu_result, "cpu");
+    }
+
+    #[test]
+    fn test_runtime_clone() {
+        let h = TestRuntime::new();
+        let rt2 = h.rt.clone();
+        let handle = rt2.io().spawn(async { 5 });
+        assert_eq!(h.block_on(handle).unwrap(), 5);
+    }
+
+    #[test]
+    fn test_runtime_debug() {
+        let h = TestRuntime::new();
+        let debug_str = format!("{:?}", h.rt);
+        assert!(debug_str.contains("Runtime"));
+    }
+
+    #[tokio::test(flavor = "multi_thread")]
+    async fn test_try_current_in_runtime() {
+        let rt = Runtime::try_current().expect("should find current runtime");
+        let result = rt.io().spawn(async { 7 }).await.unwrap();
+        assert_eq!(result, 7);
+    }
+
+    #[test]
+    fn test_try_current_outside_runtime() {
+        let err = Runtime::try_current().expect_err("must fail outside 
runtime");
+        assert_eq!(err.kind(), ErrorKind::Unexpected);
+    }
+
+    /// Verifies that when the caller drops the underlying tokio runtime, a
+    /// subsequent spawn surfaces as a `JoinError` via our `JoinHandle` rather
+    /// than hanging or misbehaving.
+    #[test]
+    fn test_spawn_after_runtime_drop_errors() {
+        let driver = tokio::runtime::Builder::new_current_thread()
+            .enable_all()
+            .build()
+            .unwrap();
+        let owned = tokio::runtime::Builder::new_multi_thread()
+            .enable_all()
+            .build()
+            .unwrap();
+        let rt = Runtime::new(&owned);
+        // Drop the caller's owned runtime. Our `Runtime` only holds a 
`Handle`,
+        // so this shuts down the underlying tokio runtime.
+        drop(owned);
+
+        // Spawning after shutdown returns a handle that resolves to a 
cancelled
+        // JoinError, which our wrapper maps to iceberg::Error.
+        let handle = rt.io().spawn(async { 1 });
+        let result = driver.block_on(handle);
+        assert!(result.is_err(), "expected error after runtime shutdown");
     }
 }
diff --git a/crates/iceberg/src/scan/mod.rs b/crates/iceberg/src/scan/mod.rs
index 27f479183..21822d9f0 100644
--- a/crates/iceberg/src/scan/mod.rs
+++ b/crates/iceberg/src/scan/mod.rs
@@ -38,7 +38,7 @@ use 
crate::expr::visitors::inclusive_metrics_evaluator::InclusiveMetricsEvaluato
 use crate::expr::{Bind, BoundPredicate, Predicate};
 use crate::io::FileIO;
 use crate::metadata_columns::{get_metadata_field_id, is_metadata_column_name};
-use crate::runtime::spawn;
+use crate::runtime::Runtime;
 use crate::spec::{DataContentType, SnapshotRef};
 use crate::table::Table;
 use crate::util::available_parallelism;
@@ -211,6 +211,7 @@ impl<'a> TableScanBuilder<'a> {
                         concurrency_limit_manifest_files: 
self.concurrency_limit_manifest_files,
                         row_group_filtering_enabled: 
self.row_group_filtering_enabled,
                         row_selection_enabled: self.row_selection_enabled,
+                        runtime: self.table.runtime().clone(),
                     });
                 };
                 current_snapshot_id.clone()
@@ -304,6 +305,7 @@ impl<'a> TableScanBuilder<'a> {
             concurrency_limit_manifest_files: 
self.concurrency_limit_manifest_files,
             row_group_filtering_enabled: self.row_group_filtering_enabled,
             row_selection_enabled: self.row_selection_enabled,
+            runtime: self.table.runtime().clone(),
         })
     }
 }
@@ -332,6 +334,8 @@ pub struct TableScan {
 
     row_group_filtering_enabled: bool,
     row_selection_enabled: bool,
+
+    runtime: Runtime,
 }
 
 impl TableScan {
@@ -353,7 +357,7 @@ impl TableScan {
         // used to stream the results back to the caller
         let (file_scan_task_tx, file_scan_task_rx) = 
channel(concurrency_limit_manifest_entries);
 
-        let (delete_file_idx, delete_file_tx) = DeleteFileIndex::new();
+        let (delete_file_idx, delete_file_tx) = 
DeleteFileIndex::new(self.runtime.clone());
 
         let manifest_list = plan_context.get_manifest_list().await?;
 
@@ -368,9 +372,13 @@ impl TableScan {
         )?;
 
         let mut channel_for_manifest_error = file_scan_task_tx.clone();
+        let mut channel_for_data_manifest_entry_error = 
file_scan_task_tx.clone();
+        let mut channel_for_delete_manifest_entry_error = 
file_scan_task_tx.clone();
+
+        let rt = self.runtime.clone();
 
         // Concurrently load all [`Manifest`]s and stream their 
[`ManifestEntry`]s
-        spawn(async move {
+        rt.io().spawn(async move {
             let result = futures::stream::iter(manifest_file_contexts)
                 .try_for_each_concurrent(concurrency_limit_manifest_files, 
|ctx| async move {
                     ctx.fetch_manifest_and_stream_manifest_entries().await
@@ -382,61 +390,83 @@ impl TableScan {
             }
         });
 
-        let mut channel_for_data_manifest_entry_error = 
file_scan_task_tx.clone();
-        let mut channel_for_delete_manifest_entry_error = 
file_scan_task_tx.clone();
-
         // Process the delete file [`ManifestEntry`] stream in parallel
-        spawn(async move {
-            let result = manifest_entry_delete_ctx_rx
-                .map(|me_ctx| Ok((me_ctx, delete_file_tx.clone())))
-                .try_for_each_concurrent(
-                    concurrency_limit_manifest_entries,
-                    |(manifest_entry_context, tx)| async move {
-                        spawn(async move {
-                            
Self::process_delete_manifest_entry(manifest_entry_context, tx).await
-                        })
-                        .await
-                    },
-                )
-                .await;
-
-            if let Err(error) = result {
-                let _ = channel_for_delete_manifest_entry_error
-                    .send(Err(error))
+        {
+            let rt = rt.clone();
+            let rt_inner = rt.clone();
+            rt.cpu().spawn(async move {
+                let result = manifest_entry_delete_ctx_rx
+                    .map(|me_ctx| Ok((me_ctx, delete_file_tx.clone())))
+                    .try_for_each_concurrent(
+                        concurrency_limit_manifest_entries,
+                        |(manifest_entry_context, tx)| {
+                            let rt_inner = rt_inner.clone();
+                            async move {
+                                rt_inner
+                                    .cpu()
+                                    .spawn(async move {
+                                        Self::process_delete_manifest_entry(
+                                            manifest_entry_context,
+                                            tx,
+                                        )
+                                        .await
+                                    })
+                                    .await?
+                            }
+                        },
+                    )
                     .await;
-            }
-        })
-        .await;
+
+                if let Err(error) = result {
+                    let _ = channel_for_delete_manifest_entry_error
+                        .send(Err(error))
+                        .await;
+                }
+            });
+        }
 
         // Process the data file [`ManifestEntry`] stream in parallel
-        spawn(async move {
-            let result = manifest_entry_data_ctx_rx
-                .map(|me_ctx| Ok((me_ctx, file_scan_task_tx.clone())))
-                .try_for_each_concurrent(
-                    concurrency_limit_manifest_entries,
-                    |(manifest_entry_context, tx)| async move {
-                        spawn(async move {
-                            
Self::process_data_manifest_entry(manifest_entry_context, tx).await
-                        })
-                        .await
-                    },
-                )
-                .await;
+        {
+            let rt_inner = rt.clone();
+            rt.cpu().spawn(async move {
+                let result = manifest_entry_data_ctx_rx
+                    .map(|me_ctx| Ok((me_ctx, file_scan_task_tx.clone())))
+                    .try_for_each_concurrent(
+                        concurrency_limit_manifest_entries,
+                        |(manifest_entry_context, tx)| {
+                            let rt_inner = rt_inner.clone();
+                            async move {
+                                rt_inner
+                                    .cpu()
+                                    .spawn(async move {
+                                        Self::process_data_manifest_entry(
+                                            manifest_entry_context,
+                                            tx,
+                                        )
+                                        .await
+                                    })
+                                    .await?
+                            }
+                        },
+                    )
+                    .await;
 
-            if let Err(error) = result {
-                let _ = 
channel_for_data_manifest_entry_error.send(Err(error)).await;
-            }
-        });
+                if let Err(error) = result {
+                    let _ = 
channel_for_data_manifest_entry_error.send(Err(error)).await;
+                }
+            });
+        }
 
         Ok(file_scan_task_rx.boxed())
     }
 
     /// Returns an [`ArrowRecordBatchStream`].
     pub async fn to_arrow(&self) -> Result<ArrowRecordBatchStream> {
-        let mut arrow_reader_builder = 
ArrowReaderBuilder::new(self.file_io.clone())
-            
.with_data_file_concurrency_limit(self.concurrency_limit_data_files)
-            .with_row_group_filtering_enabled(self.row_group_filtering_enabled)
-            .with_row_selection_enabled(self.row_selection_enabled);
+        let mut arrow_reader_builder =
+            ArrowReaderBuilder::new(self.file_io.clone(), self.runtime.clone())
+                
.with_data_file_concurrency_limit(self.concurrency_limit_data_files)
+                
.with_row_group_filtering_enabled(self.row_group_filtering_enabled)
+                .with_row_selection_enabled(self.row_selection_enabled);
 
         if let Some(batch_size) = self.batch_size {
             arrow_reader_builder = 
arrow_reader_builder.with_batch_size(batch_size);
@@ -600,6 +630,7 @@ pub mod tests {
         PrimitiveType, Schema, Struct, StructType, TableMetadata, Type,
     };
     use crate::table::Table;
+    use crate::test_utils::test_runtime;
 
     fn render_template(template: &str, ctx: Value) -> String {
         let mut env = Environment::new();
@@ -643,6 +674,7 @@ pub mod tests {
                 .identifier(TableIdent::from_strs(["db", "table1"]).unwrap())
                 .file_io(file_io.clone())
                 
.metadata_location(table_metadata1_location.as_os_str().to_str().unwrap())
+                .runtime(test_runtime())
                 .build()
                 .unwrap();
 
@@ -678,6 +710,7 @@ pub mod tests {
                 .identifier(TableIdent::from_strs(["db", "table1"]).unwrap())
                 .file_io(file_io.clone())
                 
.metadata_location(table_metadata1_location.as_os_str().to_str().unwrap())
+                .runtime(test_runtime())
                 .build()
                 .unwrap();
 
@@ -711,6 +744,7 @@ pub mod tests {
                 .identifier(TableIdent::from_strs(["db", "table1"]).unwrap())
                 .file_io(file_io.clone())
                 
.metadata_location(table_metadata1_location.as_os_str().to_str().unwrap())
+                .runtime(test_runtime())
                 .build()
                 .unwrap();
 
@@ -756,6 +790,7 @@ pub mod tests {
                 .identifier(TableIdent::from_strs(["db", "table1"]).unwrap())
                 .file_io(file_io.clone())
                 .metadata_location(table_metadata1_location.to_str().unwrap())
+                .runtime(test_runtime())
                 .build()
                 .unwrap();
 
@@ -1200,8 +1235,8 @@ pub mod tests {
         }
     }
 
-    #[test]
-    fn test_table_scan_columns() {
+    #[tokio::test]
+    async fn test_table_scan_columns() {
         let table = TableTestFixture::new().table;
 
         let table_scan = table.scan().select(["x", "y"]).build().unwrap();
@@ -1219,8 +1254,8 @@ pub mod tests {
         assert_eq!(Some(vec!["z".to_string()]), table_scan.column_names);
     }
 
-    #[test]
-    fn test_select_all() {
+    #[tokio::test]
+    async fn test_select_all() {
         let table = TableTestFixture::new().table;
 
         let table_scan = table.scan().select_all().build().unwrap();
@@ -1235,8 +1270,8 @@ pub mod tests {
         assert!(table_scan.is_err());
     }
 
-    #[test]
-    fn test_table_scan_default_snapshot_id() {
+    #[tokio::test]
+    async fn test_table_scan_default_snapshot_id() {
         let table = TableTestFixture::new().table;
 
         let table_scan = table.scan().build().unwrap();
@@ -1254,8 +1289,8 @@ pub mod tests {
         assert!(table_scan.is_err());
     }
 
-    #[test]
-    fn test_table_scan_with_snapshot_id() {
+    #[tokio::test]
+    async fn test_table_scan_with_snapshot_id() {
         let table = TableTestFixture::new().table;
 
         let table_scan = table
@@ -1364,7 +1399,11 @@ pub mod tests {
             .unwrap();
         assert_eq!(plan_task.len(), 2);
 
-        let reader = 
ArrowReaderBuilder::new(fixture.table.file_io().clone()).build();
+        let reader = ArrowReaderBuilder::new(
+            fixture.table.file_io().clone(),
+            fixture.table.runtime().clone(),
+        )
+        .build();
         let batch_stream = reader
             .clone()
             .read(Box::pin(stream::iter(vec![Ok(plan_task.remove(0))])))
@@ -1372,7 +1411,11 @@ pub mod tests {
             .stream();
         let batch_1: Vec<_> = batch_stream.try_collect().await.unwrap();
 
-        let reader = 
ArrowReaderBuilder::new(fixture.table.file_io().clone()).build();
+        let reader = ArrowReaderBuilder::new(
+            fixture.table.file_io().clone(),
+            fixture.table.runtime().clone(),
+        )
+        .build();
         let batch_stream = reader
             .read(Box::pin(stream::iter(vec![Ok(plan_task.remove(0))])))
             .unwrap()
diff --git a/crates/iceberg/src/spec/table_metadata_builder.rs 
b/crates/iceberg/src/spec/table_metadata_builder.rs
index 5754b5fe0..3191d6c13 100644
--- a/crates/iceberg/src/spec/table_metadata_builder.rs
+++ b/crates/iceberg/src/spec/table_metadata_builder.rs
@@ -1473,6 +1473,7 @@ mod tests {
         Transform, Type, UnboundPartitionField,
     };
     use crate::table::Table;
+    use crate::test_utils::test_runtime;
 
     const TEST_LOCATION: &str = "s3://bucket/test/location";
     const LAST_ASSIGNED_COLUMN_ID: i32 = 3;
@@ -2712,6 +2713,7 @@ mod tests {
             .metadata_location("s3://bucket/test/location/metadata/v1.json")
             .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
             .file_io(FileIO::new_with_memory())
+            .runtime(test_runtime())
             .build()
             .unwrap();
 
@@ -2743,6 +2745,7 @@ mod tests {
             .metadata_location("s3://bucket/test/location/metadata/v1.json")
             .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
             .file_io(FileIO::new_with_memory())
+            .runtime(test_runtime())
             .build()
             .unwrap();
 
diff --git a/crates/iceberg/src/table.rs b/crates/iceberg/src/table.rs
index 56ddbd51b..d2ba93f85 100644
--- a/crates/iceberg/src/table.rs
+++ b/crates/iceberg/src/table.rs
@@ -23,6 +23,7 @@ use crate::arrow::ArrowReaderBuilder;
 use crate::inspect::MetadataTable;
 use crate::io::FileIO;
 use crate::io::object_cache::ObjectCache;
+use crate::runtime::Runtime;
 use crate::scan::TableScanBuilder;
 use crate::spec::{SchemaRef, TableMetadata, TableMetadataRef};
 use crate::{Error, ErrorKind, Result, TableIdent};
@@ -36,6 +37,7 @@ pub struct TableBuilder {
     readonly: bool,
     disable_cache: bool,
     cache_size_bytes: Option<u64>,
+    runtime: Option<Runtime>,
 }
 
 impl TableBuilder {
@@ -48,6 +50,7 @@ impl TableBuilder {
             readonly: false,
             disable_cache: false,
             cache_size_bytes: None,
+            runtime: None,
         }
     }
 
@@ -95,6 +98,12 @@ impl TableBuilder {
         self
     }
 
+    /// Set the Runtime for this table to use when spawning tasks.
+    pub fn runtime(mut self, runtime: Runtime) -> Self {
+        self.runtime = Some(runtime);
+        self
+    }
+
     /// build the Table
     pub fn build(self) -> Result<Table> {
         let Self {
@@ -105,6 +114,7 @@ impl TableBuilder {
             readonly,
             disable_cache,
             cache_size_bytes,
+            runtime,
         } = self;
 
         let Some(file_io) = file_io else {
@@ -128,6 +138,13 @@ impl TableBuilder {
             ));
         };
 
+        let Some(runtime) = runtime else {
+            return Err(Error::new(
+                ErrorKind::DataInvalid,
+                "Runtime must be provided with TableBuilder.runtime()",
+            ));
+        };
+
         let object_cache = if disable_cache {
             Arc::new(ObjectCache::with_disabled_cache(file_io.clone()))
         } else if let Some(cache_size_bytes) = cache_size_bytes {
@@ -146,6 +163,7 @@ impl TableBuilder {
             identifier,
             readonly,
             object_cache,
+            runtime,
         })
     }
 }
@@ -159,6 +177,7 @@ pub struct Table {
     identifier: TableIdent,
     readonly: bool,
     object_cache: Arc<ObjectCache>,
+    runtime: Runtime,
 }
 
 impl Table {
@@ -230,6 +249,11 @@ impl Table {
         MetadataTable::new(self)
     }
 
+    /// Returns the [`Runtime`] for this table.
+    pub(crate) fn runtime(&self) -> &Runtime {
+        &self.runtime
+    }
+
     /// Returns the flag indicating whether the `Table` is readonly or not
     pub fn readonly(&self) -> bool {
         self.readonly
@@ -242,7 +266,7 @@ impl Table {
 
     /// Create a reader for the table.
     pub fn reader_builder(&self) -> ArrowReaderBuilder {
-        ArrowReaderBuilder::new(self.file_io.clone())
+        ArrowReaderBuilder::new(self.file_io.clone(), self.runtime().clone())
     }
 }
 
@@ -283,6 +307,7 @@ impl StaticTable {
             .metadata(metadata)
             .identifier(table_ident)
             .file_io(file_io.clone())
+            .runtime(Runtime::try_current()?)
             .readonly(true)
             .build();
 
@@ -301,6 +326,7 @@ impl StaticTable {
             .metadata_location(metadata_location)
             .identifier(table_ident)
             .file_io(file_io.clone())
+            .runtime(Runtime::try_current()?)
             .readonly(true)
             .build();
 
@@ -326,7 +352,7 @@ impl StaticTable {
 
     /// Create a reader for the table.
     pub fn reader_builder(&self) -> ArrowReaderBuilder {
-        ArrowReaderBuilder::new(self.0.file_io.clone())
+        self.0.reader_builder()
     }
 }
 
@@ -400,6 +426,7 @@ mod tests {
             .metadata(table_metadata)
             .identifier(static_identifier)
             .file_io(file_io.clone())
+            .runtime(Runtime::try_current().unwrap())
             .build()
             .unwrap();
         assert!(!table.readonly());
diff --git a/crates/iceberg/src/test_utils.rs b/crates/iceberg/src/test_utils.rs
index 527d37bb6..d47c39950 100644
--- a/crates/iceberg/src/test_utils.rs
+++ b/crates/iceberg/src/test_utils.rs
@@ -19,10 +19,32 @@
 //! This module is pub just for internal testing.
 //! It is subject to change and is not intended to be used by external users.
 
+use std::sync::OnceLock;
+
 use arrow_array::RecordBatch;
 use expect_test::Expect;
 use itertools::Itertools;
 
+use crate::runtime::Runtime;
+
+/// Returns a process-wide [`Runtime`] suitable for tests that need to 
construct
+/// a [`Table`](crate::table::Table) outside a tokio context.
+///
+/// The returned [`Runtime`] wraps a single shared multi-thread tokio runtime
+/// that is lazily built on first call and lives until process exit. Cloning is
+/// cheap, so test code can call this every time it needs a runtime to feed
+/// into [`TableBuilder::runtime`](crate::table::TableBuilder::runtime).
+pub fn test_runtime() -> Runtime {
+    static TOKIO_RT: OnceLock<tokio::runtime::Runtime> = OnceLock::new();
+    let tokio_rt = TOKIO_RT.get_or_init(|| {
+        tokio::runtime::Builder::new_multi_thread()
+            .enable_all()
+            .build()
+            .expect("failed to build test tokio runtime")
+    });
+    Runtime::new(tokio_rt)
+}
+
 /// Snapshot testing to check the resulting record batch.
 ///
 /// - `expected_schema/data`: put `expect![[""]]` as a placeholder,
diff --git a/crates/iceberg/src/transaction/mod.rs 
b/crates/iceberg/src/transaction/mod.rs
index 159021d9f..04ee1997d 100644
--- a/crates/iceberg/src/transaction/mod.rs
+++ b/crates/iceberg/src/transaction/mod.rs
@@ -250,6 +250,7 @@ mod tests {
         DataContentType, DataFileBuilder, DataFileFormat, Literal, Struct, 
TableMetadata,
     };
     use crate::table::Table;
+    use crate::test_utils::test_runtime;
     use crate::transaction::{ApplyTransactionAction, Transaction};
     use crate::{Catalog, Error, ErrorKind, TableCreation, TableIdent};
 
@@ -268,6 +269,7 @@ mod tests {
             .metadata_location("s3://bucket/test/location/metadata/v1.json")
             .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
             .file_io(FileIO::new_with_memory())
+            .runtime(test_runtime())
             .build()
             .unwrap()
     }
@@ -287,6 +289,7 @@ mod tests {
             .metadata_location("s3://bucket/test/location/metadata/v1.json")
             .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
             .file_io(FileIO::new_with_memory())
+            .runtime(test_runtime())
             .build()
             .unwrap()
     }
@@ -306,6 +309,7 @@ mod tests {
             .metadata_location("s3://bucket/test/location/metadata/v1.json")
             .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
             .file_io(FileIO::new_with_memory())
+            .runtime(test_runtime())
             .build()
             .unwrap()
     }
diff --git a/crates/iceberg/src/transaction/update_schema.rs 
b/crates/iceberg/src/transaction/update_schema.rs
index 6ee37be2c..da843d9b9 100644
--- a/crates/iceberg/src/transaction/update_schema.rs
+++ b/crates/iceberg/src/transaction/update_schema.rs
@@ -583,6 +583,7 @@ mod tests {
             
.metadata_location("s3://bucket/test/location/metadata/v1.json".to_string())
             .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
             .file_io(crate::io::FileIO::new_with_memory())
+            .runtime(crate::test_utils::test_runtime())
             .build()
             .unwrap()
     }
diff --git a/crates/integrations/datafusion/src/physical_plan/project.rs 
b/crates/integrations/datafusion/src/physical_plan/project.rs
index 0320d52dd..670d961f9 100644
--- a/crates/integrations/datafusion/src/physical_plan/project.rs
+++ b/crates/integrations/datafusion/src/physical_plan/project.rs
@@ -194,6 +194,7 @@ mod tests {
     use datafusion::arrow::datatypes::{DataType, Field, Fields};
     use datafusion::physical_plan::empty::EmptyExec;
     use iceberg::spec::{NestedField, PrimitiveType, Schema, StructType, 
Transform, Type};
+    use iceberg::test_utils::test_runtime;
 
     use super::*;
 
@@ -448,6 +449,7 @@ mod tests {
             .identifier(TableIdent::from_strs(["test", "table"]).unwrap())
             .file_io(FileIO::new_with_fs())
             .metadata_location("/test/metadata.json")
+            .runtime(test_runtime())
             .build()
             .unwrap();
 
@@ -506,6 +508,7 @@ mod tests {
             .identifier(TableIdent::from_strs(["test", "table"]).unwrap())
             .file_io(FileIO::new_with_fs())
             .metadata_location("/test/metadata.json")
+            .runtime(test_runtime())
             .build()
             .unwrap();
 
@@ -578,6 +581,7 @@ mod tests {
             .identifier(TableIdent::from_strs(["test", "table"]).unwrap())
             .file_io(FileIO::new_with_fs())
             .metadata_location("/test/metadata.json")
+            .runtime(test_runtime())
             .build()
             .unwrap();
 
diff --git a/crates/integrations/datafusion/src/physical_plan/repartition.rs 
b/crates/integrations/datafusion/src/physical_plan/repartition.rs
index 00552ce9a..4384983ee 100644
--- a/crates/integrations/datafusion/src/physical_plan/repartition.rs
+++ b/crates/integrations/datafusion/src/physical_plan/repartition.rs
@@ -182,6 +182,7 @@ mod tests {
         Transform, Type,
     };
     use iceberg::table::Table;
+    use iceberg::test_utils::test_runtime;
 
     use super::*;
 
@@ -223,6 +224,7 @@ mod tests {
             .identifier(TableIdent::from_strs(["test", "table"]).unwrap())
             .file_io(FileIO::new_with_fs())
             .metadata_location("/test/metadata.json")
+            .runtime(test_runtime())
             .build()
             .unwrap()
     }
@@ -380,6 +382,7 @@ mod tests {
             .identifier(TableIdent::from_strs(["test", 
"bucketed_table"]).unwrap())
             .file_io(FileIO::new_with_fs())
             .metadata_location("/test/bucketed_metadata.json")
+            .runtime(test_runtime())
             .build()
             .unwrap();
 
@@ -465,6 +468,7 @@ mod tests {
             .identifier(TableIdent::from_strs(["test", 
"partitioned_bucketed_table"]).unwrap())
             .file_io(FileIO::new_with_fs())
             .metadata_location("/test/partitioned_bucketed_metadata.json")
+            .runtime(test_runtime())
             .build()
             .unwrap();
 
@@ -549,6 +553,7 @@ mod tests {
             .identifier(TableIdent::from_strs(["test", "none_table"]).unwrap())
             .file_io(FileIO::new_with_fs())
             .metadata_location("/test/none_metadata.json")
+            .runtime(test_runtime())
             .build()
             .unwrap();
 
@@ -621,6 +626,7 @@ mod tests {
             .identifier(TableIdent::from_strs(["test", 
"range_only_table"]).unwrap())
             .file_io(FileIO::new_with_fs())
             .metadata_location("/test/range_only_metadata.json")
+            .runtime(test_runtime())
             .build()
             .unwrap();
 
@@ -696,6 +702,7 @@ mod tests {
             .identifier(TableIdent::from_strs(["test", 
"mixed_transforms_table"]).unwrap())
             .file_io(FileIO::new_with_fs())
             .metadata_location("/test/mixed_transforms_metadata.json")
+            .runtime(test_runtime())
             .build()
             .unwrap();
 
@@ -779,6 +786,7 @@ mod tests {
             .identifier(TableIdent::from_strs(["test", 
"temporal_partition"]).unwrap())
             .file_io(FileIO::new_with_fs())
             .metadata_location("/test/temporal_metadata.json")
+            .runtime(test_runtime())
             .build()
             .unwrap();
 
@@ -852,6 +860,7 @@ mod tests {
             .identifier(TableIdent::from_strs(["test", 
"identity_partition"]).unwrap())
             .file_io(FileIO::new_with_fs())
             .metadata_location("/test/identity_metadata.json")
+            .runtime(test_runtime())
             .build()
             .unwrap();
 
diff --git a/crates/sqllogictest/Cargo.toml b/crates/sqllogictest/Cargo.toml
index 35fb16b64..e826ad7ae 100644
--- a/crates/sqllogictest/Cargo.toml
+++ b/crates/sqllogictest/Cargo.toml
@@ -39,7 +39,7 @@ sqllogictest = { workspace = true }
 toml = { workspace = true }
 serde = { workspace = true }
 tracing = { workspace = true }
-tokio = { workspace = true, features = ["rt-multi-thread"] }
+tokio = { workspace = true }
 
 [dev-dependencies]
 libtest-mimic = { workspace = true }
diff --git a/crates/storage/opendal/Cargo.toml 
b/crates/storage/opendal/Cargo.toml
index 80eeaa3d0..55aa6ac75 100644
--- a/crates/storage/opendal/Cargo.toml
+++ b/crates/storage/opendal/Cargo.toml
@@ -55,4 +55,4 @@ futures = { workspace = true }
 async-trait = { workspace = true }
 iceberg_test_utils = { path = "../../test_utils", features = ["tests"] }
 reqwest = { workspace = true }
-tokio = { workspace = true, features = ["macros", "rt-multi-thread"] }
+tokio = { workspace = true, features = ["macros"] }


Reply via email to