This is an automated email from the ASF dual-hosted git repository. hgruszecki pushed a commit to branch io_uring_tpc_fmt in repository https://gitbox.apache.org/repos/asf/iggy.git
commit afa8e0613b4c3175cecef19a3bec530f76c2164a Author: Hubert Gruszecki <[email protected]> AuthorDate: Mon Jun 30 11:43:06 2025 +0200 format code --- core/server/src/main.rs | 2 +- .../src/streaming/segments/indexes/index_reader.rs | 28 +++++++++++----------- .../src/streaming/segments/indexes/index_writer.rs | 4 ++-- core/server/src/streaming/segments/messages/mod.rs | 20 +++++++++------- .../streaming/segments/types/messages_batch_mut.rs | 2 +- .../src/streaming/segments/writing_messages.rs | 5 +--- core/server/src/streaming/streams/storage.rs | 6 ++--- core/server/src/streaming/topics/storage.rs | 2 +- 8 files changed, 34 insertions(+), 35 deletions(-) diff --git a/core/server/src/main.rs b/core/server/src/main.rs index 5875e6bdf..1b9331a1d 100644 --- a/core/server/src/main.rs +++ b/core/server/src/main.rs @@ -245,7 +245,7 @@ fn main() -> Result<(), ServerError> { //TODO: If one of the shards fails to initialize, we should crash the whole program; if let Err(e) = shard.run().await { error!("Failed to run shard-{id}: {e}"); - } + } //TODO: If one of the shards fails to initialize, we should crash the whole program; //shard.assert_init(); }) diff --git a/core/server/src/streaming/segments/indexes/index_reader.rs b/core/server/src/streaming/segments/indexes/index_reader.rs index 240e62a1d..8b09666a2 100644 --- a/core/server/src/streaming/segments/indexes/index_reader.rs +++ b/core/server/src/streaming/segments/indexes/index_reader.rs @@ -21,6 +21,7 @@ use crate::{io::file::IggyFile, streaming::utils::PooledBuffer}; use bytes::BytesMut; use error_set::ErrContext; use iggy_common::{INDEX_SIZE, IggyError, IggyIndex, IggyIndexView}; +use monoio::fs::OpenOptions; use std::{ fs::File as StdFile, io::ErrorKind, @@ -30,7 +31,6 @@ use std::{ atomic::{AtomicU64, Ordering}, }, }; -use monoio::fs::OpenOptions; use tracing::{error, trace}; /// A dedicated struct for reading from the index file. @@ -334,19 +334,19 @@ impl IndexReader { len: u32, use_pool: bool, ) -> Result<PooledBuffer, std::io::Error> { - if use_pool { - let mut buf = PooledBuffer::with_capacity(len as usize); - unsafe { buf.set_len(len as usize) }; - let (result, buf) = self.file.read_exact_at(buf, offset as u64).await; - result?; - Ok(buf) - } else { - let mut buf = BytesMut::with_capacity(len as usize); - unsafe { buf.set_len(len as usize) }; - let (result, buf) = self.file.read_exact_at(buf, offset as u64).await; - result?; - Ok(PooledBuffer::from_existing(buf)) - } + if use_pool { + let mut buf = PooledBuffer::with_capacity(len as usize); + unsafe { buf.set_len(len as usize) }; + let (result, buf) = self.file.read_exact_at(buf, offset as u64).await; + result?; + Ok(buf) + } else { + let mut buf = BytesMut::with_capacity(len as usize); + unsafe { buf.set_len(len as usize) }; + let (result, buf) = self.file.read_exact_at(buf, offset as u64).await; + result?; + Ok(PooledBuffer::from_existing(buf)) + } } /// Gets the nth index from the index file. diff --git a/core/server/src/streaming/segments/indexes/index_writer.rs b/core/server/src/streaming/segments/indexes/index_writer.rs index 120bfe4ea..4b79f9d2f 100644 --- a/core/server/src/streaming/segments/indexes/index_writer.rs +++ b/core/server/src/streaming/segments/indexes/index_writer.rs @@ -94,7 +94,7 @@ impl IndexWriter { } let count = indexes.len() / INDEX_SIZE; - let len = indexes.len(); + let len = indexes.len(); self.file .write_all(indexes) @@ -109,7 +109,7 @@ impl IndexWriter { .map_err(|_| IggyError::CannotSaveIndexToSegment)?; self.index_size_bytes - .fetch_add(len as u64, Ordering::Release); + .fetch_add(len as u64, Ordering::Release); if self.fsync { let _ = self.fsync().await; diff --git a/core/server/src/streaming/segments/messages/mod.rs b/core/server/src/streaming/segments/messages/mod.rs index b79b1af06..9799608f4 100644 --- a/core/server/src/streaming/segments/messages/mod.rs +++ b/core/server/src/streaming/segments/messages/mod.rs @@ -19,7 +19,7 @@ mod messages_reader; mod messages_writer; -use crate::{io::file::IggyFile}; +use crate::io::file::IggyFile; use super::IggyMessagesBatchSet; use error_set::ErrContext; @@ -29,7 +29,6 @@ use monoio::io::AsyncWriteRentExt; pub use messages_reader::MessagesReader; pub use messages_writer::MessagesWriter; - /// Vectored write a batches of messages to file async fn write_batch( file: &mut IggyFile, @@ -39,14 +38,17 @@ async fn write_batch( //let mut slices = batches.iter().map(|b| to_iovec(&b)).collect::<Vec<iovec>>(); let mut total_written = 0; // TODO: Fork monoio, piece of shit runtime. - for batch in batches.iter_mut() { + for batch in batches.iter_mut() { let messages = batch.take_messages(); - let writen = file.write_all(messages).await.0.with_error_context(|error| { - format!( - "Failed to write messages to file: {file_path}, error: {error}", - ) - // TODO: Better error variant. - }).map_err(|_| IggyError::CannotAppendMessage)?; + let writen = file + .write_all(messages) + .await + .0 + .with_error_context(|error| { + format!("Failed to write messages to file: {file_path}, error: {error}",) + // TODO: Better error variant. + }) + .map_err(|_| IggyError::CannotAppendMessage)?; total_written += writen; } Ok(total_written) diff --git a/core/server/src/streaming/segments/types/messages_batch_mut.rs b/core/server/src/streaming/segments/types/messages_batch_mut.rs index 6a76b582f..764faa112 100644 --- a/core/server/src/streaming/segments/types/messages_batch_mut.rs +++ b/core/server/src/streaming/segments/types/messages_batch_mut.rs @@ -264,7 +264,7 @@ impl IggyMessagesBatchMut { (indexes, messages) } - pub fn take_messages(&mut self) -> PooledBuffer { + pub fn take_messages(&mut self) -> PooledBuffer { std::mem::take(&mut self.messages) } diff --git a/core/server/src/streaming/segments/writing_messages.rs b/core/server/src/streaming/segments/writing_messages.rs index 8313f5899..e95d1e2cb 100644 --- a/core/server/src/streaming/segments/writing_messages.rs +++ b/core/server/src/streaming/segments/writing_messages.rs @@ -106,10 +106,7 @@ impl Segment { .save_indexes(unsaved_indexes_slice) .await .with_error_context(|error| { - format!( - "Failed to save index of {} indexes to {self}. {error}", - len - ) + format!("Failed to save index of {} indexes to {self}. {error}", len) })?; self.indexes.mark_saved(); diff --git a/core/server/src/streaming/streams/storage.rs b/core/server/src/streaming/streams/storage.rs index 5bf4c5545..502120143 100644 --- a/core/server/src/streaming/streams/storage.rs +++ b/core/server/src/streaming/streams/storage.rs @@ -26,12 +26,12 @@ use error_set::ErrContext; use futures::future::join_all; use iggy_common::IggyError; use iggy_common::IggyTimestamp; +use monoio::fs; +use monoio::fs::create_dir_all; use serde::{Deserialize, Serialize}; -use tokio::sync::Mutex; use std::path::Path; use std::sync::Arc; -use monoio::fs; -use monoio::fs::create_dir_all; +use tokio::sync::Mutex; use tracing::{error, info, warn}; #[derive(Debug)] diff --git a/core/server/src/streaming/topics/storage.rs b/core/server/src/streaming/topics/storage.rs index 1025dbd3e..46894d205 100644 --- a/core/server/src/streaming/topics/storage.rs +++ b/core/server/src/streaming/topics/storage.rs @@ -29,11 +29,11 @@ use futures::future::join_all; use iggy_common::IggyError; use iggy_common::locking::IggyRwLock; use iggy_common::locking::IggySharedMutFn; +use monoio::fs; use monoio::fs::create_dir_all; use serde::{Deserialize, Serialize}; use std::path::Path; use std::sync::Arc; -use monoio::fs; use tokio::sync::Mutex; use tracing::{error, info, warn};
