This is an automated email from the ASF dual-hosted git repository.

hgruszecki pushed a commit to branch io_uring_tpc_task_registry
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit 1fe834c3e1594cfc4f2e9f254bfbd34bb3bd550e
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Wed Sep 24 16:30:33 2025 +0200

    oneshot fsync
---
 core/server/src/shard/mod.rs                 |  1 -
 core/server/src/shard/stats.rs               |  1 -
 core/server/src/shard/tasks/mod.rs           |  3 +-
 core/server/src/shard/tasks/oneshot/fsync.rs | 87 ----------------------------
 core/server/src/shard/tasks/oneshot/mod.rs   | 23 --------
 core/server/src/shard/tasks/supervisor.rs    | 46 ++++++++++++++-
 core/server/src/shard/tasks/tls.rs           |  5 --
 core/server/src/slab/streams.rs              | 39 +++++++++----
 8 files changed, 71 insertions(+), 134 deletions(-)

diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs
index 639e84ff..8a54564e 100644
--- a/core/server/src/shard/mod.rs
+++ b/core/server/src/shard/mod.rs
@@ -19,7 +19,6 @@ inner() * or more contributor license agreements.  See the 
NOTICE file
 pub mod builder;
 pub mod logging;
 pub mod namespace;
-pub mod stats;
 pub mod system;
 pub mod tasks;
 pub mod transmission;
diff --git a/core/server/src/shard/stats.rs b/core/server/src/shard/stats.rs
deleted file mode 100644
index 8b137891..00000000
--- a/core/server/src/shard/stats.rs
+++ /dev/null
@@ -1 +0,0 @@
-
diff --git a/core/server/src/shard/tasks/mod.rs 
b/core/server/src/shard/tasks/mod.rs
index f6fc2c8c..640d6dc2 100644
--- a/core/server/src/shard/tasks/mod.rs
+++ b/core/server/src/shard/tasks/mod.rs
@@ -22,7 +22,6 @@
 //! within a shard, including servers, periodic maintenance, and one-shot 
operations.
 
 pub mod continuous;
-pub mod oneshot;
 pub mod periodic;
 pub mod shutdown;
 pub mod specs;
@@ -32,7 +31,7 @@ pub mod tls;
 pub use shutdown::{Shutdown, ShutdownToken};
 pub use specs::{TaskCtx, TaskFuture, TaskKind, TaskScope, TaskSpec};
 pub use supervisor::TaskSupervisor;
-pub use tls::{init_supervisor, task_supervisor};
+pub use tls::{init_supervisor, is_supervisor_initialized, task_supervisor};
 
 use crate::shard::IggyShard;
 use specs::IsNoOp;
diff --git a/core/server/src/shard/tasks/oneshot/fsync.rs 
b/core/server/src/shard/tasks/oneshot/fsync.rs
deleted file mode 100644
index d03c1ad6..00000000
--- a/core/server/src/shard/tasks/oneshot/fsync.rs
+++ /dev/null
@@ -1,87 +0,0 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-use crate::shard::tasks::{TaskCtx, TaskFuture, TaskKind, TaskScope, TaskSpec, 
specs::OneShotSpec};
-use compio::fs::File;
-use iggy_common::IggyError;
-use std::{path::PathBuf, time::Duration};
-use tracing::{debug, error, trace};
-
-/// One-shot task for performing fsync on critical files
-#[derive(Debug)]
-pub struct FsyncTask {
-    path: PathBuf,
-    description: String,
-}
-
-impl FsyncTask {
-    pub fn new(path: PathBuf, description: String) -> Self {
-        Self { path, description }
-    }
-}
-
-impl TaskSpec for FsyncTask {
-    fn name(&self) -> &'static str {
-        "fsync"
-    }
-
-    fn kind(&self) -> TaskKind {
-        TaskKind::OneShot
-    }
-
-    fn scope(&self) -> TaskScope {
-        TaskScope::AllShards
-    }
-
-    fn is_critical(&self) -> bool {
-        true
-    }
-
-    fn run(self: Box<Self>, _ctx: TaskCtx) -> TaskFuture {
-        Box::pin(async move {
-            trace!(
-                "Performing fsync on {} - {}",
-                self.path.display(),
-                self.description
-            );
-
-            match File::open(&self.path).await {
-                Ok(file) => match file.sync_all().await {
-                    Ok(_) => {
-                        debug!("Successfully synced {} to disk", 
self.path.display());
-                        Ok(())
-                    }
-                    Err(e) => {
-                        error!("Failed to fsync {}: {}", self.path.display(), 
e);
-                        Err(IggyError::CannotSyncFile)
-                    }
-                },
-                Err(e) => {
-                    error!("Failed to open {} for fsync: {}", 
self.path.display(), e);
-                    Err(IggyError::CannotReadFile)
-                }
-            }
-        })
-    }
-}
-
-impl OneShotSpec for FsyncTask {
-    fn timeout(&self) -> Option<Duration> {
-        Some(Duration::from_secs(30)) // 30 second timeout for fsync
-    }
-}
diff --git a/core/server/src/shard/tasks/oneshot/mod.rs 
b/core/server/src/shard/tasks/oneshot/mod.rs
deleted file mode 100644
index a07e8582..00000000
--- a/core/server/src/shard/tasks/oneshot/mod.rs
+++ /dev/null
@@ -1,23 +0,0 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-//! One-shot task specifications for durability and administrative operations
-
-pub mod fsync;
-
-pub use fsync::FsyncTask;
diff --git a/core/server/src/shard/tasks/supervisor.rs 
b/core/server/src/shard/tasks/supervisor.rs
index a97b1fd1..5f999779 100644
--- a/core/server/src/shard/tasks/supervisor.rs
+++ b/core/server/src/shard/tasks/supervisor.rs
@@ -123,7 +123,7 @@ impl TaskSupervisor {
         let handle = match kind {
             TaskKind::Continuous => self.spawn_continuous(spec, ctx),
             TaskKind::Periodic { period } => self.spawn_periodic(spec, ctx, 
period),
-            TaskKind::OneShot => self.spawn_oneshot(spec, ctx),
+            TaskKind::OneShot => self.spawn_oneshot_spec(spec, ctx),
         };
 
         let task_handle = TaskHandle {
@@ -196,8 +196,8 @@ impl TaskSupervisor {
         })
     }
 
-    /// Spawn a oneshot task
-    fn spawn_oneshot(
+    /// Spawn a oneshot task from a TaskSpec
+    fn spawn_oneshot_spec(
         &self,
         spec: Box<dyn TaskSpec>,
         ctx: TaskCtx,
@@ -390,6 +390,46 @@ impl TaskSupervisor {
         });
     }
 
+    /// Spawn a oneshot task directly without going through TaskSpec
+    pub fn spawn_oneshot<F>(&self, name: impl Into<String>, critical: bool, f: 
F)
+    where
+        F: Future<Output = Result<(), IggyError>> + 'static,
+    {
+        let name = name.into();
+        let shard_id = self.shard_id;
+
+        trace!("Spawning oneshot task '{}' on shard {}", name, shard_id);
+
+        let task_name = name.clone();
+        let handle = compio::runtime::spawn(async move {
+            trace!(
+                "OneShot task '{}' starting on shard {}",
+                task_name, shard_id
+            );
+            let result = f.await;
+
+            match &result {
+                Ok(()) => trace!(
+                    "OneShot task '{}' completed on shard {}",
+                    task_name, shard_id
+                ),
+                Err(e) => error!(
+                    "OneShot task '{}' failed on shard {}: {}",
+                    task_name, shard_id, e
+                ),
+            }
+
+            result
+        });
+
+        self.oneshot_handles.borrow_mut().push(TaskHandle {
+            name,
+            kind: TaskKind::OneShot,
+            handle,
+            is_critical: critical,
+        });
+    }
+
     /// Shutdown all connections gracefully
     async fn shutdown_connections(&self) {
         info!(
diff --git a/core/server/src/shard/tasks/tls.rs 
b/core/server/src/shard/tasks/tls.rs
index 729f7027..1370c5e6 100644
--- a/core/server/src/shard/tasks/tls.rs
+++ b/core/server/src/shard/tasks/tls.rs
@@ -83,11 +83,6 @@ mod tests {
         // Now it should be initialized
         assert!(is_supervisor_initialized());
 
-        // Should be able to get supervisor without panic
-        let supervisor = task_supervisor();
-        // Note: We can't directly check the shard_id since it's private,
-        // but we can verify the supervisor exists
-
         // Clean up
         clear_supervisor();
         assert!(!is_supervisor_initialized());
diff --git a/core/server/src/slab/streams.rs b/core/server/src/slab/streams.rs
index edd76ae5..bceb3baf 100644
--- a/core/server/src/slab/streams.rs
+++ b/core/server/src/slab/streams.rs
@@ -721,18 +721,33 @@ impl Streams {
                 log.active_storage_mut().shutdown()
             });
 
-        // TODO(hubcio): These fsync operations should use 
TaskSupervisor::spawn_oneshot with FsyncTask
-        // for proper tracking and graceful shutdown. However, this requires 
passing the shard
-        // reference through multiple layers. For now, we spawn directly and 
hope for the best. YOLO.
-        compio::runtime::spawn(async move {
-            let _ = log_writer.fsync().await;
-        })
-        .detach();
-        compio::runtime::spawn(async move {
-            let _ = index_writer.fsync().await;
-            drop(index_writer)
-        })
-        .detach();
+        // Use task supervisor for proper tracking and graceful shutdown
+        use crate::shard::tasks::tls::task_supervisor;
+        use tracing::error;
+
+        task_supervisor().spawn_oneshot("fsync:segment-close-messages", true, 
async move {
+            match log_writer.fsync().await {
+                Ok(_) => Ok(()),
+                Err(e) => {
+                    error!("Failed to fsync log writer on segment close: {}", 
e);
+                    Err(e)
+                }
+            }
+        });
+
+        task_supervisor().spawn_oneshot("fsync:segment-close-index", true, 
async move {
+            match index_writer.fsync().await {
+                Ok(_) => {
+                    drop(index_writer);
+                    Ok(())
+                }
+                Err(e) => {
+                    error!("Failed to fsync index writer on segment close: 
{}", e);
+                    drop(index_writer);
+                    Err(e)
+                }
+            }
+        });
 
         let (start_offset, size, end_offset) =
             self.with_partition_by_id(stream_id, topic_id, partition_id, |(.., 
log)| {

Reply via email to