This is an automated email from the ASF dual-hosted git repository.
gkoszyk pushed a commit to branch io_uring_tpc
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/io_uring_tpc by this push:
new d56edce2 chore: reformat code on tpc branch (#1939)
d56edce2 is described below
commit d56edce22e0ab28ae2a585f821830cc7a470507a
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Mon Jun 30 13:26:27 2025 +0200
chore: reformat code on tpc branch (#1939)
---
core/server/src/main.rs | 2 +-
.../src/streaming/segments/indexes/index_reader.rs | 28 +++++++++++-----------
.../src/streaming/segments/indexes/index_writer.rs | 4 ++--
core/server/src/streaming/segments/messages/mod.rs | 20 +++++++++-------
.../streaming/segments/types/messages_batch_mut.rs | 2 +-
.../src/streaming/segments/writing_messages.rs | 5 +---
core/server/src/streaming/streams/storage.rs | 6 ++---
core/server/src/streaming/topics/storage.rs | 2 +-
8 files changed, 34 insertions(+), 35 deletions(-)
diff --git a/core/server/src/main.rs b/core/server/src/main.rs
index 5875e6bd..1b9331a1 100644
--- a/core/server/src/main.rs
+++ b/core/server/src/main.rs
@@ -245,7 +245,7 @@ fn main() -> Result<(), ServerError> {
//TODO: If one of the shards fails to initialize, we
should crash the whole program;
if let Err(e) = shard.run().await {
error!("Failed to run shard-{id}: {e}");
- }
+ }
//TODO: If one of the shards fails to initialize, we
should crash the whole program;
//shard.assert_init();
})
diff --git a/core/server/src/streaming/segments/indexes/index_reader.rs
b/core/server/src/streaming/segments/indexes/index_reader.rs
index 240e62a1..8b09666a 100644
--- a/core/server/src/streaming/segments/indexes/index_reader.rs
+++ b/core/server/src/streaming/segments/indexes/index_reader.rs
@@ -21,6 +21,7 @@ use crate::{io::file::IggyFile,
streaming::utils::PooledBuffer};
use bytes::BytesMut;
use error_set::ErrContext;
use iggy_common::{INDEX_SIZE, IggyError, IggyIndex, IggyIndexView};
+use monoio::fs::OpenOptions;
use std::{
fs::File as StdFile,
io::ErrorKind,
@@ -30,7 +31,6 @@ use std::{
atomic::{AtomicU64, Ordering},
},
};
-use monoio::fs::OpenOptions;
use tracing::{error, trace};
/// A dedicated struct for reading from the index file.
@@ -334,19 +334,19 @@ impl IndexReader {
len: u32,
use_pool: bool,
) -> Result<PooledBuffer, std::io::Error> {
- if use_pool {
- let mut buf = PooledBuffer::with_capacity(len as usize);
- unsafe { buf.set_len(len as usize) };
- let (result, buf) = self.file.read_exact_at(buf, offset as
u64).await;
- result?;
- Ok(buf)
- } else {
- let mut buf = BytesMut::with_capacity(len as usize);
- unsafe { buf.set_len(len as usize) };
- let (result, buf) = self.file.read_exact_at(buf, offset as
u64).await;
- result?;
- Ok(PooledBuffer::from_existing(buf))
- }
+ if use_pool {
+ let mut buf = PooledBuffer::with_capacity(len as usize);
+ unsafe { buf.set_len(len as usize) };
+ let (result, buf) = self.file.read_exact_at(buf, offset as
u64).await;
+ result?;
+ Ok(buf)
+ } else {
+ let mut buf = BytesMut::with_capacity(len as usize);
+ unsafe { buf.set_len(len as usize) };
+ let (result, buf) = self.file.read_exact_at(buf, offset as
u64).await;
+ result?;
+ Ok(PooledBuffer::from_existing(buf))
+ }
}
/// Gets the nth index from the index file.
diff --git a/core/server/src/streaming/segments/indexes/index_writer.rs
b/core/server/src/streaming/segments/indexes/index_writer.rs
index 120bfe4e..4b79f9d2 100644
--- a/core/server/src/streaming/segments/indexes/index_writer.rs
+++ b/core/server/src/streaming/segments/indexes/index_writer.rs
@@ -94,7 +94,7 @@ impl IndexWriter {
}
let count = indexes.len() / INDEX_SIZE;
- let len = indexes.len();
+ let len = indexes.len();
self.file
.write_all(indexes)
@@ -109,7 +109,7 @@ impl IndexWriter {
.map_err(|_| IggyError::CannotSaveIndexToSegment)?;
self.index_size_bytes
- .fetch_add(len as u64, Ordering::Release);
+ .fetch_add(len as u64, Ordering::Release);
if self.fsync {
let _ = self.fsync().await;
diff --git a/core/server/src/streaming/segments/messages/mod.rs
b/core/server/src/streaming/segments/messages/mod.rs
index b79b1af0..9799608f 100644
--- a/core/server/src/streaming/segments/messages/mod.rs
+++ b/core/server/src/streaming/segments/messages/mod.rs
@@ -19,7 +19,7 @@
mod messages_reader;
mod messages_writer;
-use crate::{io::file::IggyFile};
+use crate::io::file::IggyFile;
use super::IggyMessagesBatchSet;
use error_set::ErrContext;
@@ -29,7 +29,6 @@ use monoio::io::AsyncWriteRentExt;
pub use messages_reader::MessagesReader;
pub use messages_writer::MessagesWriter;
-
/// Vectored write a batches of messages to file
async fn write_batch(
file: &mut IggyFile,
@@ -39,14 +38,17 @@ async fn write_batch(
//let mut slices = batches.iter().map(|b|
to_iovec(&b)).collect::<Vec<iovec>>();
let mut total_written = 0;
// TODO: Fork monoio, piece of shit runtime.
- for batch in batches.iter_mut() {
+ for batch in batches.iter_mut() {
let messages = batch.take_messages();
- let writen =
file.write_all(messages).await.0.with_error_context(|error| {
- format!(
- "Failed to write messages to file: {file_path}, error:
{error}",
- )
- // TODO: Better error variant.
- }).map_err(|_| IggyError::CannotAppendMessage)?;
+ let writen = file
+ .write_all(messages)
+ .await
+ .0
+ .with_error_context(|error| {
+ format!("Failed to write messages to file: {file_path}, error:
{error}",)
+ // TODO: Better error variant.
+ })
+ .map_err(|_| IggyError::CannotAppendMessage)?;
total_written += writen;
}
Ok(total_written)
diff --git a/core/server/src/streaming/segments/types/messages_batch_mut.rs
b/core/server/src/streaming/segments/types/messages_batch_mut.rs
index 6a76b582..764faa11 100644
--- a/core/server/src/streaming/segments/types/messages_batch_mut.rs
+++ b/core/server/src/streaming/segments/types/messages_batch_mut.rs
@@ -264,7 +264,7 @@ impl IggyMessagesBatchMut {
(indexes, messages)
}
- pub fn take_messages(&mut self) -> PooledBuffer {
+ pub fn take_messages(&mut self) -> PooledBuffer {
std::mem::take(&mut self.messages)
}
diff --git a/core/server/src/streaming/segments/writing_messages.rs
b/core/server/src/streaming/segments/writing_messages.rs
index 8313f589..e95d1e2c 100644
--- a/core/server/src/streaming/segments/writing_messages.rs
+++ b/core/server/src/streaming/segments/writing_messages.rs
@@ -106,10 +106,7 @@ impl Segment {
.save_indexes(unsaved_indexes_slice)
.await
.with_error_context(|error| {
- format!(
- "Failed to save index of {} indexes to {self}. {error}",
- len
- )
+ format!("Failed to save index of {} indexes to {self}.
{error}", len)
})?;
self.indexes.mark_saved();
diff --git a/core/server/src/streaming/streams/storage.rs
b/core/server/src/streaming/streams/storage.rs
index 5bf4c554..50212014 100644
--- a/core/server/src/streaming/streams/storage.rs
+++ b/core/server/src/streaming/streams/storage.rs
@@ -26,12 +26,12 @@ use error_set::ErrContext;
use futures::future::join_all;
use iggy_common::IggyError;
use iggy_common::IggyTimestamp;
+use monoio::fs;
+use monoio::fs::create_dir_all;
use serde::{Deserialize, Serialize};
-use tokio::sync::Mutex;
use std::path::Path;
use std::sync::Arc;
-use monoio::fs;
-use monoio::fs::create_dir_all;
+use tokio::sync::Mutex;
use tracing::{error, info, warn};
#[derive(Debug)]
diff --git a/core/server/src/streaming/topics/storage.rs
b/core/server/src/streaming/topics/storage.rs
index 1025dbd3..46894d20 100644
--- a/core/server/src/streaming/topics/storage.rs
+++ b/core/server/src/streaming/topics/storage.rs
@@ -29,11 +29,11 @@ use futures::future::join_all;
use iggy_common::IggyError;
use iggy_common::locking::IggyRwLock;
use iggy_common::locking::IggySharedMutFn;
+use monoio::fs;
use monoio::fs::create_dir_all;
use serde::{Deserialize, Serialize};
use std::path::Path;
use std::sync::Arc;
-use monoio::fs;
use tokio::sync::Mutex;
use tracing::{error, info, warn};