This is an automated email from the ASF dual-hosted git repository.

hgruszecki pushed a commit to branch io_uring_tpc_fmt
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit afa8e0613b4c3175cecef19a3bec530f76c2164a
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Mon Jun 30 11:43:06 2025 +0200

    format code
---
 core/server/src/main.rs                            |  2 +-
 .../src/streaming/segments/indexes/index_reader.rs | 28 +++++++++++-----------
 .../src/streaming/segments/indexes/index_writer.rs |  4 ++--
 core/server/src/streaming/segments/messages/mod.rs | 20 +++++++++-------
 .../streaming/segments/types/messages_batch_mut.rs |  2 +-
 .../src/streaming/segments/writing_messages.rs     |  5 +---
 core/server/src/streaming/streams/storage.rs       |  6 ++---
 core/server/src/streaming/topics/storage.rs        |  2 +-
 8 files changed, 34 insertions(+), 35 deletions(-)

diff --git a/core/server/src/main.rs b/core/server/src/main.rs
index 5875e6bdf..1b9331a1d 100644
--- a/core/server/src/main.rs
+++ b/core/server/src/main.rs
@@ -245,7 +245,7 @@ fn main() -> Result<(), ServerError> {
                     //TODO: If one of the shards fails to initialize, we 
should crash the whole program;
                     if let Err(e) = shard.run().await {
                         error!("Failed to run shard-{id}: {e}");
-                    } 
+                    }
                     //TODO: If one of the shards fails to initialize, we 
should crash the whole program;
                     //shard.assert_init();
                 })
diff --git a/core/server/src/streaming/segments/indexes/index_reader.rs 
b/core/server/src/streaming/segments/indexes/index_reader.rs
index 240e62a1d..8b09666a2 100644
--- a/core/server/src/streaming/segments/indexes/index_reader.rs
+++ b/core/server/src/streaming/segments/indexes/index_reader.rs
@@ -21,6 +21,7 @@ use crate::{io::file::IggyFile, 
streaming::utils::PooledBuffer};
 use bytes::BytesMut;
 use error_set::ErrContext;
 use iggy_common::{INDEX_SIZE, IggyError, IggyIndex, IggyIndexView};
+use monoio::fs::OpenOptions;
 use std::{
     fs::File as StdFile,
     io::ErrorKind,
@@ -30,7 +31,6 @@ use std::{
         atomic::{AtomicU64, Ordering},
     },
 };
-use monoio::fs::OpenOptions;
 use tracing::{error, trace};
 
 /// A dedicated struct for reading from the index file.
@@ -334,19 +334,19 @@ impl IndexReader {
         len: u32,
         use_pool: bool,
     ) -> Result<PooledBuffer, std::io::Error> {
-            if use_pool {
-                let mut buf = PooledBuffer::with_capacity(len as usize);
-                unsafe { buf.set_len(len as usize) };
-                let (result, buf) = self.file.read_exact_at(buf, offset as 
u64).await;
-                result?;
-                Ok(buf)
-            } else {
-                let mut buf = BytesMut::with_capacity(len as usize);
-                unsafe { buf.set_len(len as usize) };
-                let (result, buf) = self.file.read_exact_at(buf, offset as 
u64).await;
-                result?;
-                Ok(PooledBuffer::from_existing(buf))
-            }
+        if use_pool {
+            let mut buf = PooledBuffer::with_capacity(len as usize);
+            unsafe { buf.set_len(len as usize) };
+            let (result, buf) = self.file.read_exact_at(buf, offset as 
u64).await;
+            result?;
+            Ok(buf)
+        } else {
+            let mut buf = BytesMut::with_capacity(len as usize);
+            unsafe { buf.set_len(len as usize) };
+            let (result, buf) = self.file.read_exact_at(buf, offset as 
u64).await;
+            result?;
+            Ok(PooledBuffer::from_existing(buf))
+        }
     }
 
     /// Gets the nth index from the index file.
diff --git a/core/server/src/streaming/segments/indexes/index_writer.rs 
b/core/server/src/streaming/segments/indexes/index_writer.rs
index 120bfe4ea..4b79f9d2f 100644
--- a/core/server/src/streaming/segments/indexes/index_writer.rs
+++ b/core/server/src/streaming/segments/indexes/index_writer.rs
@@ -94,7 +94,7 @@ impl IndexWriter {
         }
 
         let count = indexes.len() / INDEX_SIZE;
-        let len =  indexes.len();
+        let len = indexes.len();
 
         self.file
             .write_all(indexes)
@@ -109,7 +109,7 @@ impl IndexWriter {
             .map_err(|_| IggyError::CannotSaveIndexToSegment)?;
 
         self.index_size_bytes
-            .fetch_add(len  as u64, Ordering::Release);
+            .fetch_add(len as u64, Ordering::Release);
 
         if self.fsync {
             let _ = self.fsync().await;
diff --git a/core/server/src/streaming/segments/messages/mod.rs 
b/core/server/src/streaming/segments/messages/mod.rs
index b79b1af06..9799608f4 100644
--- a/core/server/src/streaming/segments/messages/mod.rs
+++ b/core/server/src/streaming/segments/messages/mod.rs
@@ -19,7 +19,7 @@
 mod messages_reader;
 mod messages_writer;
 
-use crate::{io::file::IggyFile};
+use crate::io::file::IggyFile;
 
 use super::IggyMessagesBatchSet;
 use error_set::ErrContext;
@@ -29,7 +29,6 @@ use monoio::io::AsyncWriteRentExt;
 pub use messages_reader::MessagesReader;
 pub use messages_writer::MessagesWriter;
 
-
 /// Vectored write a batches of messages to file
 async fn write_batch(
     file: &mut IggyFile,
@@ -39,14 +38,17 @@ async fn write_batch(
     //let mut slices = batches.iter().map(|b| 
to_iovec(&b)).collect::<Vec<iovec>>();
     let mut total_written = 0;
     // TODO: Fork monoio, piece of shit runtime.
-    for batch in batches.iter_mut()  {
+    for batch in batches.iter_mut() {
         let messages = batch.take_messages();
-        let writen = 
file.write_all(messages).await.0.with_error_context(|error| {
-            format!(
-                "Failed to write messages to file: {file_path}, error: 
{error}",
-            )
-            // TODO: Better error variant.
-        }).map_err(|_| IggyError::CannotAppendMessage)?;
+        let writen = file
+            .write_all(messages)
+            .await
+            .0
+            .with_error_context(|error| {
+                format!("Failed to write messages to file: {file_path}, error: 
{error}",)
+                // TODO: Better error variant.
+            })
+            .map_err(|_| IggyError::CannotAppendMessage)?;
         total_written += writen;
     }
     Ok(total_written)
diff --git a/core/server/src/streaming/segments/types/messages_batch_mut.rs 
b/core/server/src/streaming/segments/types/messages_batch_mut.rs
index 6a76b582f..764faa112 100644
--- a/core/server/src/streaming/segments/types/messages_batch_mut.rs
+++ b/core/server/src/streaming/segments/types/messages_batch_mut.rs
@@ -264,7 +264,7 @@ impl IggyMessagesBatchMut {
         (indexes, messages)
     }
 
-    pub fn take_messages(&mut self) ->  PooledBuffer {
+    pub fn take_messages(&mut self) -> PooledBuffer {
         std::mem::take(&mut self.messages)
     }
 
diff --git a/core/server/src/streaming/segments/writing_messages.rs 
b/core/server/src/streaming/segments/writing_messages.rs
index 8313f5899..e95d1e2cb 100644
--- a/core/server/src/streaming/segments/writing_messages.rs
+++ b/core/server/src/streaming/segments/writing_messages.rs
@@ -106,10 +106,7 @@ impl Segment {
             .save_indexes(unsaved_indexes_slice)
             .await
             .with_error_context(|error| {
-                format!(
-                    "Failed to save index of {} indexes to {self}. {error}",
-                    len
-                )
+                format!("Failed to save index of {} indexes to {self}. 
{error}", len)
             })?;
 
         self.indexes.mark_saved();
diff --git a/core/server/src/streaming/streams/storage.rs 
b/core/server/src/streaming/streams/storage.rs
index 5bf4c5545..502120143 100644
--- a/core/server/src/streaming/streams/storage.rs
+++ b/core/server/src/streaming/streams/storage.rs
@@ -26,12 +26,12 @@ use error_set::ErrContext;
 use futures::future::join_all;
 use iggy_common::IggyError;
 use iggy_common::IggyTimestamp;
+use monoio::fs;
+use monoio::fs::create_dir_all;
 use serde::{Deserialize, Serialize};
-use tokio::sync::Mutex;
 use std::path::Path;
 use std::sync::Arc;
-use monoio::fs;
-use monoio::fs::create_dir_all;
+use tokio::sync::Mutex;
 use tracing::{error, info, warn};
 
 #[derive(Debug)]
diff --git a/core/server/src/streaming/topics/storage.rs 
b/core/server/src/streaming/topics/storage.rs
index 1025dbd3e..46894d205 100644
--- a/core/server/src/streaming/topics/storage.rs
+++ b/core/server/src/streaming/topics/storage.rs
@@ -29,11 +29,11 @@ use futures::future::join_all;
 use iggy_common::IggyError;
 use iggy_common::locking::IggyRwLock;
 use iggy_common::locking::IggySharedMutFn;
+use monoio::fs;
 use monoio::fs::create_dir_all;
 use serde::{Deserialize, Serialize};
 use std::path::Path;
 use std::sync::Arc;
-use monoio::fs;
 use tokio::sync::Mutex;
 use tracing::{error, info, warn};
 

Reply via email to