blackmwk commented on code in PR #2308:
URL: https://github.com/apache/iceberg-rust/pull/2308#discussion_r3159446506


##########
crates/iceberg/src/runtime/mod.rs:
##########
@@ -39,37 +43,245 @@ impl<T: Send + 'static> Future for JoinHandle<T> {
     }
 }
 
-#[allow(dead_code)]
-pub fn spawn<F>(f: F) -> JoinHandle<F::Output>
-where
-    F: std::future::Future + Send + 'static,
-    F::Output: Send + 'static,
-{
-    JoinHandle(task::spawn(f))
+/// Handle to a single tokio runtime. Holds an optional `Arc` to keep the
+/// runtime alive when we own it, and a `Handle` for spawning.
+#[derive(Clone)]
+pub struct RuntimeHandle {
+    /// Keeps the tokio runtime alive when we own it (`Runtime::new`).
+    /// `None` when borrowing an existing runtime via `Handle::try_current()`.
+    _owned: Option<Arc<tokio::runtime::Runtime>>,
+    handle: tokio::runtime::Handle,
 }
 
-#[allow(dead_code)]
-pub fn spawn_blocking<F, T>(f: F) -> JoinHandle<T>
-where
-    F: FnOnce() -> T + Send + 'static,
-    T: Send + 'static,
-{
-    JoinHandle(task::spawn_blocking(f))
+impl fmt::Debug for RuntimeHandle {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        f.debug_struct("RuntimeHandle").finish()
+    }
+}
+
+impl RuntimeHandle {
+    /// Create a handle that owns the given tokio runtime.
+    fn new(runtime: Arc<tokio::runtime::Runtime>) -> Self {
+        let handle = runtime.handle().clone();
+        Self {
+            _owned: Some(runtime),
+            handle,
+        }
+    }
+
+    /// Create a handle that borrows an existing tokio runtime via its handle.
+    fn from_handle(handle: tokio::runtime::Handle) -> Self {

Review Comment:
   ```suggestion
       fn from_tokio_handle(handle: tokio::runtime::Handle) -> Self {
   ```
   
   Make the name more readable.



##########
crates/iceberg/src/runtime/mod.rs:
##########
@@ -39,37 +43,245 @@ impl<T: Send + 'static> Future for JoinHandle<T> {
     }
 }
 
-#[allow(dead_code)]
-pub fn spawn<F>(f: F) -> JoinHandle<F::Output>
-where
-    F: std::future::Future + Send + 'static,
-    F::Output: Send + 'static,
-{
-    JoinHandle(task::spawn(f))
+/// Handle to a single tokio runtime. Holds an optional `Arc` to keep the
+/// runtime alive when we own it, and a `Handle` for spawning.
+#[derive(Clone)]
+pub struct RuntimeHandle {
+    /// Keeps the tokio runtime alive when we own it (`Runtime::new`).
+    /// `None` when borrowing an existing runtime via `Handle::try_current()`.
+    _owned: Option<Arc<tokio::runtime::Runtime>>,

Review Comment:
   I still don't understand why we need to keep this field, even if it's  
created from `Runtime`, I think keeping a handle would be enough?



##########
crates/iceberg/src/runtime/mod.rs:
##########
@@ -17,12 +17,16 @@
 
 // This module contains the async runtime abstraction for iceberg.
 
+use std::fmt;
 use std::future::Future;
 use std::pin::Pin;
+use std::sync::Arc;
 use std::task::{Context, Poll};
 
 use tokio::task;
+use tracing::warn;
 
+/// Wrapper around tokio's JoinHandle that panics on task failure.
 pub struct JoinHandle<T>(task::JoinHandle<T>);

Review Comment:
   Do we really need this?



##########
crates/iceberg/src/runtime/mod.rs:
##########
@@ -39,37 +43,245 @@ impl<T: Send + 'static> Future for JoinHandle<T> {
     }
 }
 
-#[allow(dead_code)]
-pub fn spawn<F>(f: F) -> JoinHandle<F::Output>
-where
-    F: std::future::Future + Send + 'static,
-    F::Output: Send + 'static,
-{
-    JoinHandle(task::spawn(f))
+/// Handle to a single tokio runtime. Holds an optional `Arc` to keep the
+/// runtime alive when we own it, and a `Handle` for spawning.
+#[derive(Clone)]
+pub struct RuntimeHandle {
+    /// Keeps the tokio runtime alive when we own it (`Runtime::new`).
+    /// `None` when borrowing an existing runtime via `Handle::try_current()`.
+    _owned: Option<Arc<tokio::runtime::Runtime>>,
+    handle: tokio::runtime::Handle,
 }
 
-#[allow(dead_code)]
-pub fn spawn_blocking<F, T>(f: F) -> JoinHandle<T>
-where
-    F: FnOnce() -> T + Send + 'static,
-    T: Send + 'static,
-{
-    JoinHandle(task::spawn_blocking(f))
+impl fmt::Debug for RuntimeHandle {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        f.debug_struct("RuntimeHandle").finish()
+    }
+}
+
+impl RuntimeHandle {
+    /// Create a handle that owns the given tokio runtime.
+    fn new(runtime: Arc<tokio::runtime::Runtime>) -> Self {
+        let handle = runtime.handle().clone();
+        Self {
+            _owned: Some(runtime),
+            handle,
+        }
+    }
+
+    /// Create a handle that borrows an existing tokio runtime via its handle.
+    fn from_handle(handle: tokio::runtime::Handle) -> Self {
+        Self {
+            _owned: None,
+            handle,
+        }
+    }
+
+    /// Spawn an async task.
+    pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
+    where
+        F: Future + Send + 'static,
+        F::Output: Send + 'static,
+    {
+        JoinHandle(self.handle.spawn(future))
+    }
+
+    /// Spawn a blocking task.
+    pub fn spawn_blocking<F, T>(&self, f: F) -> JoinHandle<T>
+    where
+        F: FnOnce() -> T + Send + 'static,
+        T: Send + 'static,
+    {
+        JoinHandle(self.handle.spawn_blocking(f))
+    }
+}
+
+/// Iceberg's runtime abstraction.
+///
+/// Contains separate handles for IO-bound and CPU-bound work. When constructed
+/// with a single tokio runtime, both `io()` and `cpu()` route to the same one.
+/// Use `new_with_split` to provide dedicated runtimes for each category.
+///
+/// Cloning is cheap (Arc clones internally).
+#[derive(Clone)]
+pub struct Runtime {
+    io: RuntimeHandle,
+    cpu: RuntimeHandle,
+}
+
+impl fmt::Debug for Runtime {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        f.debug_struct("Runtime").finish()
+    }
+}
+
+impl Runtime {
+    /// Create a Runtime backed by a single tokio runtime for all work.
+    pub fn new(runtime: Arc<tokio::runtime::Runtime>) -> Self {
+        let handle = RuntimeHandle::new(runtime);
+        Self {
+            io: handle.clone(),
+            cpu: handle,
+        }
+    }
+
+    /// Create a Runtime with separate tokio runtimes for IO and CPU work.
+    pub fn new_with_split(
+        io_runtime: Arc<tokio::runtime::Runtime>,
+        cpu_runtime: Arc<tokio::runtime::Runtime>,
+    ) -> Self {
+        Self {
+            io: RuntimeHandle::new(io_runtime),
+            cpu: RuntimeHandle::new(cpu_runtime),
+        }
+    }
+
+    /// Handle for IO-bound work (network fetches, file reads).
+    pub fn io(&self) -> &RuntimeHandle {
+        &self.io
+    }
+
+    /// Handle for CPU-bound work (decoding, predicate eval, projection).
+    pub fn cpu(&self) -> &RuntimeHandle {
+        &self.cpu
+    }
+}
+
+impl Default for Runtime {
+    fn default() -> Self {
+        if let Ok(handle) = tokio::runtime::Handle::try_current() {
+            let rh = RuntimeHandle::from_handle(handle);
+            return Self {
+                io: rh.clone(),
+                cpu: rh,
+            };
+        }
+
+        warn!(
+            "No tokio runtime found. Creating a new multi-thread runtime for 
iceberg. \
+             Consider providing an explicit Runtime via 
CatalogBuilder::with_runtime() \
+             or TableBuilder::runtime() to avoid unexpected resource usage."
+        );
+
+        let rt = Arc::new(

Review Comment:
   I prefer not doing this since this make things complicated, and we have to 
own the tokio runtime. For users with simple use case, tokio provided enough 
facilities to make things simple, and we only need to fetch it from current 
runtime. I would recommend to add a `try_from_current` method for `Runtime`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to