This is an automated email from the ASF dual-hosted git repository.
gkoszyk pushed a commit to branch io_uring_tpc
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/io_uring_tpc by this push:
new 1fca8609 fix(io_uring): fix lints (#2240)
1fca8609 is described below
commit 1fca8609f762e7e3cfcca5cdfdc60fdaa4318f96
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Mon Oct 6 16:53:00 2025 +0200
fix(io_uring): fix lints (#2240)
Co-authored-by: numminex <[email protected]>
---
core/bench/src/actors/consumer/client/low_level.rs | 7 +-
.../benchmark_producing_consumer.rs | 96 +++---
.../tests/cli/stream/test_stream_list_command.rs | 12 +-
core/integration/tests/server/scenarios/mod.rs | 1 -
.../tests/server/scenarios/system_scenario.rs | 6 +-
.../tests/streaming/common/test_setup.rs | 105 +------
core/integration/tests/streaming/get_by_offset.rs | 12 +-
core/integration/tests/streaming/mod.rs | 32 +-
.../delete_consumer_offset_handler.rs | 10 +-
.../store_consumer_offset_handler.rs | 11 +-
.../binary/handlers/streams/get_stream_handler.rs | 2 +-
.../binary/handlers/topics/create_topic_handler.rs | 4 +-
.../binary/handlers/topics/get_topics_handler.rs | 2 +-
core/server/src/binary/mapper.rs | 2 +-
core/server/src/bootstrap.rs | 4 +-
core/server/src/http/http_server.rs | 3 +-
core/server/src/http/mapper.rs | 10 +-
core/server/src/http/streams.rs | 2 +-
core/server/src/lib.rs | 9 -
core/server/src/main.rs | 4 +-
core/server/src/shard/builder.rs | 5 +-
core/server/src/shard/mod.rs | 43 +--
core/server/src/shard/system/messages.rs | 4 +-
core/server/src/shard/system/partitions.rs | 4 +-
.../src/shard/system/personal_access_tokens.rs | 2 +-
core/server/src/shard/system/streams.rs | 3 +-
core/server/src/shard/system/topics.rs | 36 +--
core/server/src/shard/system/users.rs | 10 +-
core/server/src/shard/system/utils.rs | 4 +-
core/server/src/shard/transmission/event.rs | 14 -
core/server/src/slab/consumer_groups.rs | 4 +
core/server/src/slab/helpers.rs | 12 -
core/server/src/slab/partitions.rs | 6 +-
core/server/src/slab/streams.rs | 70 ++---
core/server/src/slab/topics.rs | 6 +-
core/server/src/state/file.rs | 2 +-
core/server/src/state/system.rs | 23 +-
.../deduplication/message_deduplicator.rs | 5 -
.../src/streaming/partitions/consumer_offset.rs | 4 +-
core/server/src/streaming/partitions/helpers.rs | 2 -
core/server/src/streaming/partitions/journal.rs | 2 +-
core/server/src/streaming/partitions/log.rs | 8 +-
core/server/src/streaming/partitions/partition2.rs | 3 +-
core/server/src/streaming/partitions/storage2.rs | 11 +-
core/server/src/streaming/persistence/mod.rs | 1 -
core/server/src/streaming/persistence/task.rs | 135 ---------
.../src/streaming/segments/indexes/index_reader.rs | 2 +-
core/server/src/streaming/stats/mod.rs | 332 ++++++++++++++++++++-
core/server/src/streaming/stats/stats.rs | 331 --------------------
core/server/src/streaming/streams/stream2.rs | 2 +-
.../server/src/streaming/topics/consumer_group2.rs | 4 +-
core/server/src/streaming/topics/helpers.rs | 38 +--
core/server/src/streaming/topics/storage2.rs | 5 +-
core/server/src/streaming/topics/topic2.rs | 3 +-
core/server/src/streaming/utils/memory_pool.rs | 6 +-
core/server/src/tcp/connection_handler.rs | 2 +-
core/server/src/tcp/tcp_listener.rs | 3 +-
core/server/src/tcp/tcp_server.rs | 1 -
core/server/src/tcp/tcp_tls_listener.rs | 1 -
59 files changed, 519 insertions(+), 964 deletions(-)
diff --git a/core/bench/src/actors/consumer/client/low_level.rs
b/core/bench/src/actors/consumer/client/low_level.rs
index 4c80c4ce..27feb3a4 100644
--- a/core/bench/src/actors/consumer/client/low_level.rs
+++ b/core/bench/src/actors/consumer/client/low_level.rs
@@ -101,11 +101,8 @@ impl ConsumerClient for LowLevelConsumerClient {
let total_bytes = batch_total_size_bytes(&polled);
self.offset += messages_count;
- match self.polling_strategy.kind {
- PollingKind::Offset => {
- self.polling_strategy.value += messages_count;
- }
- _ => {}
+ if self.polling_strategy.kind == PollingKind::Offset {
+ self.polling_strategy.value += messages_count;
}
Ok(Some(BatchMetrics {
diff --git
a/core/bench/src/actors/producing_consumer/benchmark_producing_consumer.rs
b/core/bench/src/actors/producing_consumer/benchmark_producing_consumer.rs
index 0f5d323a..a676d449 100644
--- a/core/bench/src/actors/producing_consumer/benchmark_producing_consumer.rs
+++ b/core/bench/src/actors/producing_consumer/benchmark_producing_consumer.rs
@@ -146,63 +146,63 @@ where
if is_producer
&& !self.send_finish_condition.is_done()
&& (!require_reply || !awaiting_reply)
+ && let Some(batch) = self.producer.produce_batch(&mut
batch_generator).await?
{
- if let Some(batch) = self.producer.produce_batch(&mut
batch_generator).await? {
- rl_value += batch.user_data_bytes;
- sent_user_bytes += batch.user_data_bytes;
- sent_total_bytes += batch.total_bytes;
- sent_messages += u64::from(batch.messages);
- sent_batches += 1;
- awaiting_reply = is_consumer;
+ rl_value += batch.user_data_bytes;
+ sent_user_bytes += batch.user_data_bytes;
+ sent_total_bytes += batch.total_bytes;
+ sent_messages += u64::from(batch.messages);
+ sent_batches += 1;
+ awaiting_reply = is_consumer;
- if self
- .send_finish_condition
- .account_and_check(batch.user_data_bytes)
- {
- info!(
- "ProducingConsumer #{actor_id} → finished sending
{sent_messages} messages in {sent_batches} batches ({sent_user_bytes} bytes of
user data, {sent_total_bytes} bytes of total data), send finish condition:
{send_status}, poll finish condition: {poll_status}",
- actor_id = self.producer_config.producer_id,
- sent_messages = sent_messages.human_count_bare(),
- sent_batches = sent_batches.human_count_bare(),
- sent_user_bytes =
sent_user_bytes.human_count_bytes(),
- sent_total_bytes =
sent_total_bytes.human_count_bytes(),
- send_status = self.send_finish_condition.status(),
- poll_status = self.poll_finish_condition.status()
- );
- }
+ if self
+ .send_finish_condition
+ .account_and_check(batch.user_data_bytes)
+ {
+ info!(
+ "ProducingConsumer #{actor_id} → finished sending
{sent_messages} messages in {sent_batches} batches ({sent_user_bytes} bytes of
user data, {sent_total_bytes} bytes of total data), send finish condition:
{send_status}, poll finish condition: {poll_status}",
+ actor_id = self.producer_config.producer_id,
+ sent_messages = sent_messages.human_count_bare(),
+ sent_batches = sent_batches.human_count_bare(),
+ sent_user_bytes = sent_user_bytes.human_count_bytes(),
+ sent_total_bytes =
sent_total_bytes.human_count_bytes(),
+ send_status = self.send_finish_condition.status(),
+ poll_status = self.poll_finish_condition.status()
+ );
}
}
- if is_consumer && !self.poll_finish_condition.is_done() {
- if let Some(batch) = self.consumer.consume_batch().await? {
- rl_value += batch.user_data_bytes;
- recv_user_bytes += batch.user_data_bytes;
- recv_total_bytes += batch.total_bytes;
- recv_messages += u64::from(batch.messages);
- recv_batches += 1;
+ if is_consumer
+ && !self.poll_finish_condition.is_done()
+ && let Some(batch) = self.consumer.consume_batch().await?
+ {
+ rl_value += batch.user_data_bytes;
+ recv_user_bytes += batch.user_data_bytes;
+ recv_total_bytes += batch.total_bytes;
+ recv_messages += u64::from(batch.messages);
+ recv_batches += 1;
- let elapsed =
u64::try_from(start.elapsed().as_micros()).unwrap_or(u64::MAX);
- let latency =
u64::try_from(batch.latency.as_micros()).unwrap_or(u64::MAX);
+ let elapsed =
u64::try_from(start.elapsed().as_micros()).unwrap_or(u64::MAX);
+ let latency =
u64::try_from(batch.latency.as_micros()).unwrap_or(u64::MAX);
- records.push(BenchmarkRecord {
- elapsed_time_us: elapsed,
- latency_us: latency,
- messages: sent_messages + recv_messages,
- message_batches: sent_batches + recv_batches,
- user_data_bytes: sent_user_bytes + recv_user_bytes,
- total_bytes: sent_total_bytes + recv_total_bytes,
- });
+ records.push(BenchmarkRecord {
+ elapsed_time_us: elapsed,
+ latency_us: latency,
+ messages: sent_messages + recv_messages,
+ message_batches: sent_batches + recv_batches,
+ user_data_bytes: sent_user_bytes + recv_user_bytes,
+ total_bytes: sent_total_bytes + recv_total_bytes,
+ });
- if let Some(limiter) = &rate_limiter {
- limiter.wait_until_necessary(rl_value).await;
- rl_value = 0;
- }
+ if let Some(limiter) = &rate_limiter {
+ limiter.wait_until_necessary(rl_value).await;
+ rl_value = 0;
+ }
- self.poll_finish_condition
- .account_and_check(batch.user_data_bytes);
- if require_reply {
- awaiting_reply = false;
- }
+ self.poll_finish_condition
+ .account_and_check(batch.user_data_bytes);
+ if require_reply {
+ awaiting_reply = false;
}
}
}
diff --git a/core/integration/tests/cli/stream/test_stream_list_command.rs
b/core/integration/tests/cli/stream/test_stream_list_command.rs
index 5f879fc0..610e0384 100644
--- a/core/integration/tests/cli/stream/test_stream_list_command.rs
+++ b/core/integration/tests/cli/stream/test_stream_list_command.rs
@@ -27,18 +27,13 @@ use predicates::str::{contains, starts_with};
use serial_test::parallel;
struct TestStreamListCmd {
- stream_id: u32,
name: String,
output: OutputFormat,
}
impl TestStreamListCmd {
- fn new(stream_id: u32, name: String, output: OutputFormat) -> Self {
- Self {
- stream_id,
- name,
- output,
- }
+ fn new(name: String, output: OutputFormat) -> Self {
+ Self { name, output }
}
fn to_args(&self) -> Vec<&str> {
@@ -82,21 +77,18 @@ pub async fn should_be_successful() {
iggy_cmd_test.setup().await;
iggy_cmd_test
.execute_test(TestStreamListCmd::new(
- 1,
String::from("prod"),
OutputFormat::Default,
))
.await;
iggy_cmd_test
.execute_test(TestStreamListCmd::new(
- 2,
String::from("testing"),
OutputFormat::List,
))
.await;
iggy_cmd_test
.execute_test(TestStreamListCmd::new(
- 3,
String::from("misc"),
OutputFormat::Table,
))
diff --git a/core/integration/tests/server/scenarios/mod.rs
b/core/integration/tests/server/scenarios/mod.rs
index c63cc212..0e2aa594 100644
--- a/core/integration/tests/server/scenarios/mod.rs
+++ b/core/integration/tests/server/scenarios/mod.rs
@@ -41,7 +41,6 @@ const CONSUMER_GROUP_NAME: &str = "test-consumer-group";
const USERNAME_1: &str = "user1";
const USERNAME_2: &str = "user2";
const USERNAME_3: &str = "user3";
-const CONSUMER_ID: u32 = 1;
const CONSUMER_KIND: ConsumerKind = ConsumerKind::Consumer;
const MESSAGES_COUNT: u32 = 1337;
diff --git a/core/integration/tests/server/scenarios/system_scenario.rs
b/core/integration/tests/server/scenarios/system_scenario.rs
index 02072cf5..61163cde 100644
--- a/core/integration/tests/server/scenarios/system_scenario.rs
+++ b/core/integration/tests/server/scenarios/system_scenario.rs
@@ -140,14 +140,12 @@ pub async fn run(client_factory: &dyn ClientFactory) {
assert_eq!(topic.partitions.len(), PARTITIONS_COUNT as usize);
assert_eq!(topic.size, 0);
assert_eq!(topic.messages_count, 0);
- let mut id = 0;
- for topic_partition in topic.partitions {
- assert_eq!(topic_partition.id, id);
+ for (id, topic_partition) in topic.partitions.iter().enumerate() {
+ assert_eq!(topic_partition.id, id as u32);
assert_eq!(topic_partition.segments_count, 1);
assert_eq!(topic_partition.size, 0);
assert_eq!(topic_partition.current_offset, 0);
assert_eq!(topic_partition.messages_count, 0);
- id += 1;
}
// 12. Get topic details by name
diff --git a/core/integration/tests/streaming/common/test_setup.rs
b/core/integration/tests/streaming/common/test_setup.rs
index d22b16ea..cc461f89 100644
--- a/core/integration/tests/streaming/common/test_setup.rs
+++ b/core/integration/tests/streaming/common/test_setup.rs
@@ -19,16 +19,12 @@
use compio::fs;
use server::bootstrap::create_directories;
use server::configs::system::SystemConfig;
-use server::streaming::persistence::persister::{FileWithSyncPersister,
PersisterKind};
-use server::streaming::storage::SystemStorage;
use server::streaming::utils::MemoryPool;
-use std::rc::Rc;
use std::sync::Arc;
use uuid::Uuid;
pub struct TestSetup {
pub config: Arc<SystemConfig>,
- pub storage: Rc<SystemStorage>,
}
impl TestSetup {
@@ -44,107 +40,8 @@ impl TestSetup {
let config = Arc::new(config);
fs::create_dir(config.get_system_path()).await.unwrap();
create_directories(&config).await.unwrap();
- let persister = PersisterKind::FileWithSync(FileWithSyncPersister {});
- let storage = Rc::new(SystemStorage::new(config.clone(),
Arc::new(persister)));
MemoryPool::init_pool(config.clone());
- TestSetup { config, storage }
- }
-
- pub async fn create_streams_directory(&self) {
- if fs::metadata(&self.config.get_streams_path()).await.is_err() {
- fs::create_dir(&self.config.get_streams_path())
- .await
- .unwrap();
- }
- }
-
- pub async fn create_stream_directory(&self, stream_id: u32) {
- self.create_streams_directory().await;
- if fs::metadata(&self.config.get_stream_path(stream_id as usize))
- .await
- .is_err()
- {
- fs::create_dir(&self.config.get_stream_path(stream_id as usize))
- .await
- .unwrap();
- }
- }
-
- pub async fn create_topics_directory(&self, stream_id: u32) {
- self.create_stream_directory(stream_id).await;
- if fs::metadata(&self.config.get_topics_path(stream_id as usize))
- .await
- .is_err()
- {
- fs::create_dir(&self.config.get_topics_path(stream_id as usize))
- .await
- .unwrap();
- }
- }
-
- pub async fn create_topic_directory(&self, stream_id: u32, topic_id: u32) {
- self.create_topics_directory(stream_id).await;
- if fs::metadata(
- &self
- .config
- .get_topic_path(stream_id as usize, topic_id as usize),
- )
- .await
- .is_err()
- {
- fs::create_dir(
- &self
- .config
- .get_topic_path(stream_id as usize, topic_id as usize),
- )
- .await
- .unwrap();
- }
- }
-
- pub async fn create_partitions_directory(&self, stream_id: u32, topic_id:
u32) {
- self.create_topic_directory(stream_id, topic_id).await;
- if fs::metadata(
- &self
- .config
- .get_partitions_path(stream_id as usize, topic_id as usize),
- )
- .await
- .is_err()
- {
- fs::create_dir(
- &self
- .config
- .get_partitions_path(stream_id as usize, topic_id as
usize),
- )
- .await
- .unwrap();
- }
- }
-
- pub async fn create_partition_directory(
- &self,
- stream_id: u32,
- topic_id: u32,
- partition_id: u32,
- ) {
- self.create_partitions_directory(stream_id, topic_id).await;
- if fs::metadata(&self.config.get_partition_path(
- stream_id as usize,
- topic_id as usize,
- partition_id as usize,
- ))
- .await
- .is_err()
- {
- fs::create_dir(&self.config.get_partition_path(
- stream_id as usize,
- topic_id as usize,
- partition_id as usize,
- ))
- .await
- .unwrap();
- }
+ TestSetup { config }
}
}
diff --git a/core/integration/tests/streaming/get_by_offset.rs
b/core/integration/tests/streaming/get_by_offset.rs
index 7fd903bb..35810ee3 100644
--- a/core/integration/tests/streaming/get_by_offset.rs
+++ b/core/integration/tests/streaming/get_by_offset.rs
@@ -221,7 +221,7 @@ async fn test_get_messages_by_offset(
// Test 1: All messages from start
let args = PollingArgs::new(PollingStrategy::offset(0),
total_sent_messages, false);
let (_, all_loaded_messages) = streams
- .poll_messages(&namespace, consumer.clone(), args)
+ .poll_messages(&namespace, consumer, args)
.await
.unwrap();
assert_eq!(
@@ -244,7 +244,7 @@ async fn test_get_messages_by_offset(
false,
);
let (_, middle_messages) = streams
- .poll_messages(&namespace, consumer.clone(), args)
+ .poll_messages(&namespace, consumer, args)
.await
.unwrap();
@@ -262,7 +262,7 @@ async fn test_get_messages_by_offset(
let final_offset = *batch_offsets.last().unwrap();
let args = PollingArgs::new(PollingStrategy::offset(final_offset + 1),
1, false);
let (_, no_messages) = streams
- .poll_messages(&namespace, consumer.clone(), args)
+ .poll_messages(&namespace, consumer, args)
.await
.unwrap();
assert_eq!(
@@ -277,7 +277,7 @@ async fn test_get_messages_by_offset(
let subset_size = std::cmp::min(3, total_sent_messages);
let args = PollingArgs::new(PollingStrategy::offset(0), subset_size,
false);
let (_, subset_messages) = streams
- .poll_messages(&namespace, consumer.clone(), args)
+ .poll_messages(&namespace, consumer, args)
.await
.unwrap();
assert_eq!(
@@ -294,7 +294,7 @@ async fn test_get_messages_by_offset(
let span_size = 8; // Should span across 2nd, 3rd, and into 4th batch
let args = PollingArgs::new(PollingStrategy::offset(span_offset),
span_size, false);
let (_, batches) = streams
- .poll_messages(&namespace, consumer.clone(), args)
+ .poll_messages(&namespace, consumer, args)
.await
.unwrap();
assert_eq!(
@@ -372,7 +372,7 @@ async fn test_get_messages_by_offset(
false,
);
let (_, chunk) = streams
- .poll_messages(&namespace, consumer.clone(), args)
+ .poll_messages(&namespace, consumer, args)
.await
.unwrap();
diff --git a/core/integration/tests/streaming/mod.rs
b/core/integration/tests/streaming/mod.rs
index c759c165..d6f55a84 100644
--- a/core/integration/tests/streaming/mod.rs
+++ b/core/integration/tests/streaming/mod.rs
@@ -16,8 +16,6 @@
* under the License.
*/
-use bytes::Bytes;
-use iggy::prelude::IggyMessage;
use iggy_common::{CompressionAlgorithm, Identifier, IggyError, IggyExpiry,
MaxTopicSize};
use server::{
configs::system::SystemConfig,
@@ -59,7 +57,7 @@ async fn bootstrap_test_environment(
let streams = Streams::default();
// Create stream together with its dirs
let stream = stream2::create_and_insert_stream_mem(&streams, stream_name);
- create_stream_file_hierarchy(shard_id, stream.id(), &config).await?;
+ create_stream_file_hierarchy(shard_id, stream.id(), config).await?;
// Create topic together with its dirs
let stream_id = Identifier::numeric(stream.id() as u32).unwrap();
let parent_stats = streams.with_stream_by_id(&stream_id, |(_, stats)|
stats.clone());
@@ -76,7 +74,7 @@ async fn bootstrap_test_environment(
max_topic_size,
parent_stats,
);
- create_topic_file_hierarchy(shard_id, stream.id(), topic.id(),
&config).await?;
+ create_topic_file_hierarchy(shard_id, stream.id(), topic.id(),
config).await?;
// Create partition together with its dirs
let topic_id = Identifier::numeric(topic.id() as u32).unwrap();
let parent_stats = streams.with_topic_by_id(
@@ -93,7 +91,7 @@ async fn bootstrap_test_environment(
config,
);
for partition in partitions {
- create_partition_file_hierarchy(shard_id, stream.id(), topic.id(),
partition.id(), &config)
+ create_partition_file_hierarchy(shard_id, stream.id(), topic.id(),
partition.id(), config)
.await?;
// Open the log
@@ -106,7 +104,7 @@ async fn bootstrap_test_environment(
let messages_size = 0;
let indexes_size = 0;
let storage = create_segment_storage(
- &config,
+ config,
stream.id(),
topic.id(),
partition.id(),
@@ -131,24 +129,4 @@ async fn bootstrap_test_environment(
partition_id: 0,
task_registry,
})
-}
-
-fn create_messages() -> Vec<IggyMessage> {
- vec![
- create_message(1, "message 1"),
- create_message(2, "message 2"),
- create_message(3, "message 3"),
- create_message(4, "message 3.2"),
- create_message(5, "message 1.2"),
- create_message(6, "message 3.3"),
- ]
-}
-
-fn create_message(id: u128, payload: &str) -> IggyMessage {
- let payload = Bytes::from(payload.to_string());
- IggyMessage::builder()
- .id(id)
- .payload(payload)
- .build()
- .expect("Failed to create message with valid payload and headers")
-}
+}
\ No newline at end of file
diff --git
a/core/server/src/binary/handlers/consumer_offsets/delete_consumer_offset_handler.rs
b/core/server/src/binary/handlers/consumer_offsets/delete_consumer_offset_handler.rs
index 9458861d..96c1e43c 100644
---
a/core/server/src/binary/handlers/consumer_offsets/delete_consumer_offset_handler.rs
+++
b/core/server/src/binary/handlers/consumer_offsets/delete_consumer_offset_handler.rs
@@ -21,7 +21,6 @@ use crate::binary::handlers::consumer_offsets::COMPONENT;
use crate::binary::handlers::utils::receive_and_validate;
use crate::binary::sender::SenderKind;
use crate::shard::IggyShard;
-use crate::shard::transmission::event::ShardEvent;
use crate::streaming::session::Session;
use anyhow::Result;
use error_set::ErrContext;
@@ -43,7 +42,7 @@ impl ServerCommandHandler for DeleteConsumerOffset {
shard: &Rc<IggyShard>,
) -> Result<(), IggyError> {
debug!("session: {session}, command: {self}");
- let (polling_consumer, partition_id) = shard
+ shard
.delete_consumer_offset(
session,
self.consumer,
@@ -55,13 +54,6 @@ impl ServerCommandHandler for DeleteConsumerOffset {
.with_error_context(|error| format!("{COMPONENT} (error: {error})
- failed to delete consumer offset for topic with ID: {} in stream with ID: {}
partition ID: {:#?}, session: {}",
self.topic_id, self.stream_id, self.partition_id, session
))?;
- // TODO: Get rid of this event.
- let event = ShardEvent::DeletedOffset {
- stream_id: self.stream_id,
- topic_id: self.topic_id,
- partition_id,
- polling_consumer,
- };
sender.send_empty_ok_response().await?;
Ok(())
}
diff --git
a/core/server/src/binary/handlers/consumer_offsets/store_consumer_offset_handler.rs
b/core/server/src/binary/handlers/consumer_offsets/store_consumer_offset_handler.rs
index 85eb95cd..db770e4a 100644
---
a/core/server/src/binary/handlers/consumer_offsets/store_consumer_offset_handler.rs
+++
b/core/server/src/binary/handlers/consumer_offsets/store_consumer_offset_handler.rs
@@ -23,7 +23,6 @@ use crate::binary::handlers::consumer_offsets::COMPONENT;
use crate::binary::handlers::utils::receive_and_validate;
use crate::binary::sender::SenderKind;
use crate::shard::IggyShard;
-use crate::shard::transmission::event::ShardEvent;
use crate::streaming::session::Session;
use anyhow::Result;
use error_set::ErrContext;
@@ -44,7 +43,7 @@ impl ServerCommandHandler for StoreConsumerOffset {
shard: &Rc<IggyShard>,
) -> Result<(), IggyError> {
debug!("session: {session}, command: {self}");
- let (polling_consumer, partition_id) = shard
+ shard
.store_consumer_offset(
session,
self.consumer,
@@ -57,14 +56,6 @@ impl ServerCommandHandler for StoreConsumerOffset {
.with_error_context(|error| format!("{COMPONENT} (error: {error})
- failed to store consumer offset for stream_id: {}, topic_id: {},
partition_id: {:?}, offset: {}, session: {}",
self.stream_id, self.topic_id, self.partition_id, self.offset,
session
))?;
- // TODO: Get rid of this event.
- let event = ShardEvent::StoredOffset {
- stream_id: self.stream_id,
- topic_id: self.topic_id,
- partition_id,
- polling_consumer,
- offset: self.offset,
- };
sender.send_empty_ok_response().await?;
Ok(())
}
diff --git a/core/server/src/binary/handlers/streams/get_stream_handler.rs
b/core/server/src/binary/handlers/streams/get_stream_handler.rs
index dfd4cc4a..195cd0b2 100644
--- a/core/server/src/binary/handlers/streams/get_stream_handler.rs
+++ b/core/server/src/binary/handlers/streams/get_stream_handler.rs
@@ -59,7 +59,7 @@ impl ServerCommandHandler for GetStream {
self.stream_id,
session.get_user_id(),
)
- });
+ })?;
let response = shard
.streams2
.with_components_by_id(stream_id, |(root, stats)|
mapper::map_stream(&root, &stats));
diff --git a/core/server/src/binary/handlers/topics/create_topic_handler.rs
b/core/server/src/binary/handlers/topics/create_topic_handler.rs
index 2f1ee6cd..35eba8d9 100644
--- a/core/server/src/binary/handlers/topics/create_topic_handler.rs
+++ b/core/server/src/binary/handlers/topics/create_topic_handler.rs
@@ -71,7 +71,7 @@ impl ServerCommandHandler for CreateTopic {
stream_id: self.stream_id.clone(),
topic,
};
- let responses = shard.broadcast_event_to_all_shards(event).await;
+ let _responses = shard.broadcast_event_to_all_shards(event).await;
let partitions = shard
.create_partitions2(
session,
@@ -85,7 +85,7 @@ impl ServerCommandHandler for CreateTopic {
topic_id: Identifier::numeric(topic_id as u32).unwrap(),
partitions,
};
- let responses = shard.broadcast_event_to_all_shards(event).await;
+ let _responses = shard.broadcast_event_to_all_shards(event).await;
let response = shard.streams2.with_topic_by_id(
&self.stream_id,
&Identifier::numeric(topic_id as u32).unwrap(),
diff --git a/core/server/src/binary/handlers/topics/get_topics_handler.rs
b/core/server/src/binary/handlers/topics/get_topics_handler.rs
index 0c78e90e..52f23a7a 100644
--- a/core/server/src/binary/handlers/topics/get_topics_handler.rs
+++ b/core/server/src/binary/handlers/topics/get_topics_handler.rs
@@ -51,7 +51,7 @@ impl ServerCommandHandler for GetTopics {
shard
.permissioner
.borrow()
- .get_topics(session.get_user_id(), numeric_stream_id as u32);
+ .get_topics(session.get_user_id(), numeric_stream_id as u32)?;
let response = shard.streams2.with_topics(&self.stream_id, |topics| {
topics.with_components(|topics| {
diff --git a/core/server/src/binary/mapper.rs b/core/server/src/binary/mapper.rs
index ca43991d..6856dea2 100644
--- a/core/server/src/binary/mapper.rs
+++ b/core/server/src/binary/mapper.rs
@@ -23,7 +23,7 @@ use crate::slab::traits_ext::{EntityComponentSystem,
IntoComponents};
use crate::streaming::clients::client_manager::Client;
use crate::streaming::partitions::partition2::PartitionRoot;
use
crate::streaming::personal_access_tokens::personal_access_token::PersonalAccessToken;
-use crate::streaming::stats::stats::{PartitionStats, StreamStats, TopicStats};
+use crate::streaming::stats::{PartitionStats, StreamStats, TopicStats};
use crate::streaming::streams::stream2;
use crate::streaming::topics::consumer_group2::{ConsumerGroupMembers,
ConsumerGroupRoot, Member};
use crate::streaming::topics::topic2::{self, TopicRoot};
diff --git a/core/server/src/bootstrap.rs b/core/server/src/bootstrap.rs
index 09dca813..5a150ab1 100644
--- a/core/server/src/bootstrap.rs
+++ b/core/server/src/bootstrap.rs
@@ -32,7 +32,7 @@ use crate::{
persistence::persister::{FilePersister, FileWithSyncPersister,
PersisterKind},
personal_access_tokens::personal_access_token::PersonalAccessToken,
segments::{Segment2, storage::Storage},
- stats::stats::{PartitionStats, StreamStats, TopicStats},
+ stats::{PartitionStats, StreamStats, TopicStats},
storage::SystemStorage,
streams::stream2,
topics::{consumer_group2, topic2},
@@ -631,7 +631,7 @@ async fn load_partition(
parent_stats: Arc<TopicStats>,
) -> Result<partition2::Partition, IggyError> {
let stats = Arc::new(PartitionStats::new(parent_stats));
- let partition_id = partition_state.id as u32;
+ let partition_id = partition_state.id;
let partition_path = config.get_partition_path(stream_id, topic_id,
partition_id as usize);
let log_files = collect_log_files(&partition_path).await?;
diff --git a/core/server/src/http/http_server.rs
b/core/server/src/http/http_server.rs
index 317e9c61..617bd763 100644
--- a/core/server/src/http/http_server.rs
+++ b/core/server/src/http/http_server.rs
@@ -135,7 +135,8 @@ pub async fn start_http_server(
let service =
app.into_make_service_with_connect_info::<CompioSocketAddr>();
// Spawn the server in a task so we can handle shutdown
- let server_task =
+ // TODO(hubcio): investigate if we can use TaskRegistry here
+ let _server_task =
compio::runtime::spawn(async move { cyper_axum::serve(listener,
service).await });
// Wait for shutdown signal
diff --git a/core/server/src/http/mapper.rs b/core/server/src/http/mapper.rs
index df93990a..1fec7f99 100644
--- a/core/server/src/http/mapper.rs
+++ b/core/server/src/http/mapper.rs
@@ -21,7 +21,7 @@ use crate::slab::Keyed;
use crate::slab::traits_ext::{EntityComponentSystem, IntoComponents};
use crate::streaming::clients::client_manager::Client;
use
crate::streaming::personal_access_tokens::personal_access_token::PersonalAccessToken;
-use crate::streaming::stats::stats::TopicStats;
+use crate::streaming::stats::TopicStats;
use crate::streaming::topics::consumer_group2::{ConsumerGroupMembers,
ConsumerGroupRoot};
use crate::streaming::topics::topic2::TopicRoot;
use crate::streaming::users::user::User;
@@ -259,7 +259,7 @@ pub fn map_generated_access_token_to_identity_info(token:
GeneratedToken) -> Ide
/// Map StreamRoot and StreamStats to StreamDetails for HTTP responses
pub fn map_stream_details(
root: &crate::streaming::streams::stream2::StreamRoot,
- stats: &crate::streaming::stats::stats::StreamStats,
+ stats: &crate::streaming::stats::StreamStats,
) -> iggy_common::StreamDetails {
// Get topics using the new slab-based API
let topics = root.topics().with_components(|topic_ref| {
@@ -291,10 +291,10 @@ pub fn map_stream_details(
}
}
-/// Map StreamRoot and StreamStats to Stream for HTTP responses
+/// Map StreamRoot and StreamStats to Stream for HTTP responses
pub fn map_stream(
root: &crate::streaming::streams::stream2::StreamRoot,
- stats: &crate::streaming::stats::stats::StreamStats,
+ stats: &crate::streaming::stats::StreamStats,
) -> iggy_common::Stream {
iggy_common::Stream {
id: root.id() as u32,
@@ -309,7 +309,7 @@ pub fn map_stream(
/// Map multiple streams from slabs
pub fn map_streams_from_slabs(
roots: &slab::Slab<crate::streaming::streams::stream2::StreamRoot>,
- stats: &slab::Slab<Arc<crate::streaming::stats::stats::StreamStats>>,
+ stats: &slab::Slab<Arc<crate::streaming::stats::StreamStats>>,
) -> Vec<iggy_common::Stream> {
let mut streams = Vec::new();
for (root, stat) in roots
diff --git a/core/server/src/http/streams.rs b/core/server/src/http/streams.rs
index 650262f7..ab71c551 100644
--- a/core/server/src/http/streams.rs
+++ b/core/server/src/http/streams.rs
@@ -150,7 +150,7 @@ async fn create_stream(
.shard()
.streams2
.with_components_by_id(created_stream_id, |(root, stats)| {
- crate::http::mapper::map_stream_details(&*root, &**stats)
+ crate::http::mapper::map_stream_details(&root, &stats)
})
})();
diff --git a/core/server/src/lib.rs b/core/server/src/lib.rs
index 735a2e51..e24396ac 100644
--- a/core/server/src/lib.rs
+++ b/core/server/src/lib.rs
@@ -18,8 +18,6 @@
#[cfg(not(feature = "disable-mimalloc"))]
use mimalloc::MiMalloc;
-use nix::libc::c_void;
-use nix::libc::iovec;
#[cfg(not(feature = "disable-mimalloc"))]
#[global_allocator]
@@ -55,10 +53,3 @@ pub fn map_toggle_str<'a>(enabled: bool) -> &'a str {
false => "disabled",
}
}
-
-pub fn to_iovec<T>(data: &[T]) -> iovec {
- iovec {
- iov_base: data.as_ptr() as *mut c_void,
- iov_len: data.len() * std::mem::size_of::<T>(),
- }
-}
diff --git a/core/server/src/main.rs b/core/server/src/main.rs
index 36091a5a..a4ecdcc7 100644
--- a/core/server/src/main.rs
+++ b/core/server/src/main.rs
@@ -52,7 +52,6 @@ use std::collections::HashSet;
use std::rc::Rc;
use std::str::FromStr;
use std::sync::atomic::{AtomicU64, Ordering};
-use tokio::time::Instant;
use tracing::{error, info, instrument, warn};
const COMPONENT: &str = "MAIN";
@@ -63,7 +62,6 @@ static SHUTDOWN_START_TIME: AtomicU64 = AtomicU64::new(0);
#[instrument(skip_all, name = "trace_start_server")]
#[compio::main]
async fn main() -> Result<(), ServerError> {
- let startup_timestamp = Instant::now();
let standard_font = FIGfont::standard().unwrap();
let figure = standard_font.convert("Iggy Server");
println!("{}", figure.unwrap());
@@ -373,7 +371,7 @@ async fn main() -> Result<(), ServerError> {
for (idx, handle) in handles.into_iter().enumerate() {
handle
.join()
- .expect(format!("Failed to join shard thread-{}", idx).as_str());
+ .unwrap_or_else(|_| panic!("Failed to join shard thread-{}", idx));
}
let shutdown_duration_msg = {
diff --git a/core/server/src/shard/builder.rs b/core/server/src/shard/builder.rs
index a439f7e3..5f25ce63 100644
--- a/core/server/src/shard/builder.rs
+++ b/core/server/src/shard/builder.rs
@@ -116,7 +116,7 @@ impl IggyShardBuilder {
let connections = self.connections.unwrap();
let encryptor = self.encryptor;
let version = self.version.unwrap();
- let (stop_sender, stop_receiver, frame_receiver) = connections
+ let (_, stop_receiver, frame_receiver) = connections
.iter()
.filter(|c| c.id == id)
.map(|c| {
@@ -150,10 +150,9 @@ impl IggyShardBuilder {
users: RefCell::new(users),
encryptor,
config,
- version,
+ _version: version,
state,
stop_receiver,
- stop_sender,
messages_receiver: Cell::new(Some(frame_receiver)),
metrics,
is_shutting_down: AtomicBool::new(false),
diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs
index 3c6e9348..896c202b 100644
--- a/core/server/src/shard/mod.rs
+++ b/core/server/src/shard/mod.rs
@@ -37,7 +37,7 @@ use crate::{
message::{ShardMessage, ShardRequest, ShardRequestPayload,
ShardSendRequestResult},
},
},
- shard_error, shard_info, shard_warn,
+ shard_error, shard_info,
slab::{streams::Streams, traits_ext::EntityMarker},
state::StateKind,
streaming::{
@@ -70,7 +70,7 @@ use std::{
time::{Duration, Instant},
};
use tracing::{debug, error, instrument, trace};
-use transmission::connector::{Receiver, ShardConnector, StopReceiver,
StopSender};
+use transmission::connector::{Receiver, ShardConnector, StopReceiver};
pub const COMPONENT: &str = "SHARD";
pub const SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(10);
@@ -123,7 +123,7 @@ impl ShardInfo {
pub struct IggyShard {
pub id: u16,
shards: Vec<Shard>,
- version: SemanticVersion,
+ _version: SemanticVersion,
// Heart transplant of the old streams structure.
pub(crate) streams2: Streams,
@@ -142,7 +142,6 @@ pub struct IggyShard {
pub(crate) metrics: Metrics,
pub messages_receiver: Cell<Option<Receiver<ShardFrame>>>,
pub(crate) stop_receiver: StopReceiver,
- pub(crate) stop_sender: StopSender,
pub(crate) is_shutting_down: AtomicBool,
pub(crate) tcp_bound_address: Cell<Option<SocketAddr>>,
pub(crate) quic_bound_address: Cell<Option<SocketAddr>>,
@@ -622,7 +621,7 @@ impl IggyShard {
),
};
- let batches = if consumer_offset.is_none() {
+ if consumer_offset.is_none() {
let batches = self
.streams2
.get_messages_by_offset(
@@ -652,8 +651,7 @@ impl IggyShard {
)
.await?;
Ok(batches)
- };
- batches
+ }
}
}?;
@@ -864,13 +862,6 @@ impl IggyShard {
// Notify config writer that a server has bound
let _ = self.config_writer_notify.try_send(());
}
- _ => {
- shard_warn!(
- self.id,
- "Received AddressBound event for unsupported
protocol: {:?}",
- protocol
- );
- }
}
Ok(())
}
@@ -991,29 +982,7 @@ impl IggyShard {
}
Ok(())
- }
- ShardEvent::StoredOffset {
- stream_id,
- topic_id,
- partition_id,
- polling_consumer,
- offset,
- } => {
- self.store_consumer_offset_bypass_auth(
- &stream_id,
- &topic_id,
- &polling_consumer,
- partition_id,
- offset,
- );
- Ok(())
- }
- ShardEvent::DeletedOffset {
- stream_id,
- topic_id,
- partition_id,
- polling_consumer,
- } => Ok(()),
+ },
ShardEvent::JoinedConsumerGroup {
client_id,
stream_id,
diff --git a/core/server/src/shard/system/messages.rs
b/core/server/src/shard/system/messages.rs
index ea834468..437c163e 100644
--- a/core/server/src/shard/system/messages.rs
+++ b/core/server/src/shard/system/messages.rs
@@ -368,11 +368,11 @@ impl IggyShard {
self.ensure_authenticated(session)?;
let numeric_stream_id = self
.streams2
- .with_stream_by_id(&stream_id, streams::helpers::get_stream_id());
+ .with_stream_by_id(stream_id, streams::helpers::get_stream_id());
let numeric_topic_id =
self.streams2
- .with_topic_by_id(&stream_id, &topic_id,
topics::helpers::get_topic_id());
+ .with_topic_by_id(stream_id, topic_id,
topics::helpers::get_topic_id());
// Validate permissions for given user on stream and topic.
self.permissioner
diff --git a/core/server/src/shard/system/partitions.rs
b/core/server/src/shard/system/partitions.rs
index 695006e2..0f742b2c 100644
--- a/core/server/src/shard/system/partitions.rs
+++ b/core/server/src/shard/system/partitions.rs
@@ -271,7 +271,7 @@ impl IggyShard {
let ns = IggyNamespace::new(numeric_stream_id, numeric_topic_id,
partition_id);
self.remove_shard_table_record(&ns);
- self.delete_partition_dir(numeric_stream_id, numeric_topic_id,
partition_id, &mut log)
+ self.delete_partition_dir(numeric_stream_id, numeric_topic_id,
partition_id)
.await?;
let segments_count = stats.segments_count_inconsistent();
let messages_count = stats.messages_count_inconsistent();
@@ -297,14 +297,12 @@ impl IggyShard {
stream_id: usize,
topic_id: usize,
partition_id: usize,
- log: &mut SegmentedLog<MemoryMessageJournal>,
) -> Result<(), IggyError> {
delete_partitions_from_disk(
self.id,
stream_id,
topic_id,
partition_id,
- log,
&self.config.system,
)
.await
diff --git a/core/server/src/shard/system/personal_access_tokens.rs
b/core/server/src/shard/system/personal_access_tokens.rs
index 62322e5e..6eb6a0cd 100644
--- a/core/server/src/shard/system/personal_access_tokens.rs
+++ b/core/server/src/shard/system/personal_access_tokens.rs
@@ -171,7 +171,7 @@ impl IggyShard {
let session = active_sessions
.iter()
.find(|s| s.client_id == client_id)
- .expect(format!("At this point session for {}, should exist.",
client_id).as_str());
+ .unwrap_or_else(|| panic!("At this point session for {}, should
exist.", client_id));
self.login_with_personal_access_token(token, Some(session))?;
Ok(())
}
diff --git a/core/server/src/shard/system/streams.rs
b/core/server/src/shard/system/streams.rs
index 1dbae2c6..b2679f07 100644
--- a/core/server/src/shard/system/streams.rs
+++ b/core/server/src/shard/system/streams.rs
@@ -108,8 +108,7 @@ impl IggyShard {
}
pub fn delete_stream2_bypass_auth(&self, id: &Identifier) ->
stream2::Stream {
- let stream = self.delete_stream2_base(id);
- stream
+ self.delete_stream2_base(id)
}
fn delete_stream2_base(&self, id: &Identifier) -> stream2::Stream {
diff --git a/core/server/src/shard/system/topics.rs
b/core/server/src/shard/system/topics.rs
index 385fb7c5..fbf583cd 100644
--- a/core/server/src/shard/system/topics.rs
+++ b/core/server/src/shard/system/topics.rs
@@ -21,7 +21,7 @@ use crate::shard::IggyShard;
use crate::shard_info;
use crate::slab::traits_ext::{EntityComponentSystem, EntityMarker, InsertCell,
IntoComponents};
use crate::streaming::session::Session;
-use crate::streaming::stats::stats::{StreamStats, TopicStats};
+use crate::streaming::stats::{StreamStats, TopicStats};
use crate::streaming::topics::storage2::{create_topic_file_hierarchy,
delete_topic_from_disk};
use crate::streaming::topics::topic2::{self};
use crate::streaming::{partitions, streams, topics};
@@ -31,9 +31,9 @@ use iggy_common::{
};
use std::str::FromStr;
use std::sync::Arc;
-use std::u32;
impl IggyShard {
+ #[allow(clippy::too_many_arguments)]
pub async fn create_topic2(
&self,
session: &Session,
@@ -94,40 +94,12 @@ impl IggyShard {
Ok(topic)
}
- fn create_and_insert_topics_mem(
- &self,
- stream_id: &Identifier,
- name: String,
- replication_factor: u8,
- message_expiry: IggyExpiry,
- compression: CompressionAlgorithm,
- max_topic_size: MaxTopicSize,
- parent_stats: Arc<StreamStats>,
- ) -> topic2::Topic {
- let stats = Arc::new(TopicStats::new(parent_stats));
- let now = IggyTimestamp::now();
- let mut topic = topic2::Topic::new(
- name,
- stats,
- now,
- replication_factor,
- message_expiry,
- compression,
- max_topic_size,
- );
-
- let id = self
- .streams2
- .with_topics(stream_id, |topics| topics.insert(topic.clone()));
- topic.update_id(id);
- topic
- }
-
pub fn create_topic2_bypass_auth(&self, stream_id: &Identifier, topic:
topic2::Topic) -> usize {
self.streams2
.with_topics(stream_id, |topics| topics.insert(topic))
}
+ #[allow(clippy::too_many_arguments)]
pub fn update_topic2(
&self,
session: &Session,
@@ -176,6 +148,7 @@ impl IggyShard {
Ok(())
}
+ #[allow(clippy::too_many_arguments)]
pub fn update_topic_bypass_auth2(
&self,
stream_id: &Identifier,
@@ -198,6 +171,7 @@ impl IggyShard {
Ok(())
}
+ #[allow(clippy::too_many_arguments)]
pub fn update_topic_base2(
&self,
stream_id: &Identifier,
diff --git a/core/server/src/shard/system/users.rs
b/core/server/src/shard/system/users.rs
index 573e1f2e..c13d0cbf 100644
--- a/core/server/src/shard/system/users.rs
+++ b/core/server/src/shard/system/users.rs
@@ -64,11 +64,7 @@ impl IggyShard {
pub fn try_get_user(&self, user_id: &Identifier) -> Result<Option<User>,
IggyError> {
match user_id.kind {
IdKind::Numeric => {
- let user = self
- .users
- .borrow()
- .get(&user_id.get_u32_value()?)
- .map(|user| user.clone());
+ let user =
self.users.borrow().get(&user_id.get_u32_value()?).cloned();
Ok(user)
}
IdKind::String => {
@@ -226,7 +222,6 @@ impl IggyShard {
fn delete_user_base(&self, user_id: &Identifier) -> Result<User,
IggyError> {
let existing_user_id;
- let existing_username;
{
let user = self.get_user(user_id).with_error_context(|error| {
format!("{COMPONENT} (error: {error}) - failed to get user
with id: {user_id}")
@@ -237,7 +232,6 @@ impl IggyShard {
}
existing_user_id = user.id;
- existing_username = user.username.clone();
}
let user = self
@@ -448,7 +442,7 @@ impl IggyShard {
let session = active_sessions
.iter()
.find(|s| s.client_id == client_id)
- .expect(format!("At this point session for {}, should exist.",
client_id).as_str());
+ .unwrap_or_else(|| panic!("At this point session for {}, should
exist.", client_id));
self.login_user_with_credentials(username, Some(password),
Some(session))?;
Ok(())
}
diff --git a/core/server/src/shard/system/utils.rs
b/core/server/src/shard/system/utils.rs
index d2518da7..a0a75313 100644
--- a/core/server/src/shard/system/utils.rs
+++ b/core/server/src/shard/system/utils.rs
@@ -120,9 +120,7 @@ impl IggyShard {
topics::helpers::get_current_partition_id_unchecked(member_id),
)
};
- let Some(partition_id) = partition_id else {
- return None;
- };
+ let partition_id = partition_id?;
Some((
PollingConsumer::consumer_group(cg_id, member_id),
diff --git a/core/server/src/shard/transmission/event.rs
b/core/server/src/shard/transmission/event.rs
index 61bd5647..a57417e5 100644
--- a/core/server/src/shard/transmission/event.rs
+++ b/core/server/src/shard/transmission/event.rs
@@ -1,7 +1,6 @@
use crate::streaming::{
partitions::partition2,
personal_access_tokens::personal_access_token::PersonalAccessToken,
- polling_consumer::PollingConsumer,
streams::stream2,
topics::{
consumer_group2::{self},
@@ -81,19 +80,6 @@ pub enum ShardEvent {
stream_id: Identifier,
topic_id: Identifier,
},
- StoredOffset {
- stream_id: Identifier,
- topic_id: Identifier,
- partition_id: usize,
- polling_consumer: PollingConsumer,
- offset: u64,
- },
- DeletedOffset {
- stream_id: Identifier,
- topic_id: Identifier,
- partition_id: usize,
- polling_consumer: PollingConsumer,
- },
CreatedUser {
user_id: u32,
username: String,
diff --git a/core/server/src/slab/consumer_groups.rs
b/core/server/src/slab/consumer_groups.rs
index 5f98ac5d..d66a41be 100644
--- a/core/server/src/slab/consumer_groups.rs
+++ b/core/server/src/slab/consumer_groups.rs
@@ -112,6 +112,10 @@ impl ConsumerGroups {
self.root.len()
}
+ pub fn is_empty(&self) -> bool {
+ self.root.is_empty()
+ }
+
pub fn get_index(&self, id: &Identifier) -> usize {
match id.kind {
iggy_common::IdKind::Numeric => id.get_u32_value().unwrap() as
usize,
diff --git a/core/server/src/slab/helpers.rs b/core/server/src/slab/helpers.rs
index d2fe7487..0a1c5145 100644
--- a/core/server/src/slab/helpers.rs
+++ b/core/server/src/slab/helpers.rs
@@ -17,12 +17,6 @@ where
|(root, ..)| f(root.topics())
}
-pub fn topics_async<O, F>(f: F) -> impl AsyncFnOnce(ComponentsById<StreamRef>)
-> O
-where
- F: for<'a> AsyncFnOnce(&'a Topics) -> O,
-{
- async |(root, ..)| f(root.topics()).await
-}
pub fn topics_mut<O, F>(f: F) -> impl FnOnce(ComponentsById<StreamRef>) -> O
where
F: for<'a> FnOnce(&'a Topics) -> O,
@@ -37,12 +31,6 @@ where
|(root, ..)| f(root.partitions())
}
-pub fn partitions_async<O, F>(f: F) -> impl
AsyncFnOnce(ComponentsById<TopicRef>) -> O
-where
- F: for<'a> AsyncFnOnce(&'a Partitions) -> O,
-{
- async |(root, ..)| f(root.partitions()).await
-}
pub fn partitions_mut<O, F>(f: F) -> impl FnOnce(ComponentsById<TopicRefMut>)
-> O
where
F: for<'a> FnOnce(&'a mut Partitions) -> O,
diff --git a/core/server/src/slab/partitions.rs
b/core/server/src/slab/partitions.rs
index f430d1ab..87de3186 100644
--- a/core/server/src/slab/partitions.rs
+++ b/core/server/src/slab/partitions.rs
@@ -13,7 +13,7 @@ use crate::{
PartitionRefMut,
},
},
- stats::stats::PartitionStats,
+ stats::PartitionStats,
},
};
use slab::Slab;
@@ -196,6 +196,10 @@ impl Partitions {
self.root.len()
}
+ pub fn is_empty(&self) -> bool {
+ self.root.is_empty()
+ }
+
pub fn insert_default_log(&mut self) -> ContainerId {
self.log.insert(Default::default())
}
diff --git a/core/server/src/slab/streams.rs b/core/server/src/slab/streams.rs
index f318327e..386c7e56 100644
--- a/core/server/src/slab/streams.rs
+++ b/core/server/src/slab/streams.rs
@@ -1,6 +1,7 @@
use crate::shard::task_registry::TaskRegistry;
use crate::shard_trace;
use crate::streaming::partitions as streaming_partitions;
+use crate::streaming::stats::StreamStats;
use crate::{
binary::handlers::messages::poll_messages_handler::IggyPollMetadata,
configs::{cache_indexes::CacheIndexesConfig, system::SystemConfig},
@@ -26,7 +27,6 @@ use crate::{
segments::{
IggyMessagesBatchMut, IggyMessagesBatchSet, Segment2,
storage::create_segment_storage,
},
- stats::stats::StreamStats,
streams::{
self,
stream2::{self, StreamRef, StreamRefMut},
@@ -381,10 +381,7 @@ impl MainOps for Streams {
};
let Some(consumer_offset) = consumer_offset else {
- let batches = self
- .get_messages_by_offset(stream_id, topic_id,
partition_id, 0, count)
- .await?;
- return Ok((metadata, batches));
+ return Err(IggyError::ConsumerOffsetNotFound(consumer_id));
};
let offset = consumer_offset + 1;
trace!(
@@ -451,7 +448,7 @@ impl Streams {
f: impl FnOnce(ComponentsById<StreamRef>) -> T,
) -> T {
let id = self.get_index(id);
- self.with_components_by_id(id, |stream| f(stream))
+ self.with_components_by_id(id, f)
}
pub fn with_stream_by_id_mut<T>(
@@ -460,7 +457,7 @@ impl Streams {
f: impl FnOnce(ComponentsById<StreamRefMut>) -> T,
) -> T {
let id = self.get_index(id);
- self.with_components_by_id_mut(id, |stream| f(stream))
+ self.with_components_by_id_mut(id, f)
}
pub fn with_topics<T>(&self, stream_id: &Identifier, f: impl
FnOnce(&Topics) -> T) -> T {
@@ -585,10 +582,6 @@ impl Streams {
})
}
- pub fn len(&self) -> usize {
- self.root.borrow().len()
- }
-
pub async fn get_messages_by_offset(
&self,
stream_id: &Identifier,
@@ -610,10 +603,6 @@ impl Streams {
let mut current_offset = offset;
for idx in range {
- if remaining_count == 0 {
- break;
- }
-
let (segment_start_offset, segment_end_offset) =
self.with_partition_by_id(
stream_id,
topic_id,
@@ -665,11 +654,16 @@ impl Streams {
}
batches.add_batch_set(messages);
+
+ if remaining_count == 0 {
+ break;
+ }
}
Ok(batches)
}
+ #[allow(clippy::too_many_arguments)]
async fn get_messages_by_offset_base(
&self,
stream_id: &Identifier,
@@ -799,6 +793,7 @@ impl Streams {
Ok(combined_batch_set)
}
+ #[allow(clippy::too_many_arguments)]
async fn load_messages_from_disk_by_offset(
&self,
stream_id: &Identifier,
@@ -893,10 +888,6 @@ impl Streams {
let mut batches = IggyMessagesBatchSet::empty();
for idx in range {
- if remaining_count == 0 {
- break;
- }
-
let segment_end_timestamp = self.with_partition_by_id(
stream_id,
topic_id,
@@ -929,6 +920,10 @@ impl Streams {
remaining_count = remaining_count.saturating_sub(messages_count);
batches.add_batch_set(messages);
+
+ if remaining_count == 0 {
+ break;
+ }
}
Ok(batches)
@@ -1313,7 +1308,6 @@ impl Streams {
partition_id,
streaming_partitions::helpers::update_index_and_increment_stats(
saved,
- batch_count,
config,
),
);
@@ -1335,26 +1329,26 @@ impl Streams {
return Ok(());
}
- if let Some(ref messages_writer) = storage.messages_writer {
- if let Err(e) = messages_writer.fsync().await {
- tracing::error!(
- "Failed to fsync messages writer for partition {}: {}",
- partition_id,
- e
- );
- return Err(e);
- }
+ if let Some(ref messages_writer) = storage.messages_writer
+ && let Err(e) = messages_writer.fsync().await
+ {
+ tracing::error!(
+ "Failed to fsync messages writer for partition {}: {}",
+ partition_id,
+ e
+ );
+ return Err(e);
}
- if let Some(ref index_writer) = storage.index_writer {
- if let Err(e) = index_writer.fsync().await {
- tracing::error!(
- "Failed to fsync index writer for partition {}: {}",
- partition_id,
- e
- );
- return Err(e);
- }
+ if let Some(ref index_writer) = storage.index_writer
+ && let Err(e) = index_writer.fsync().await
+ {
+ tracing::error!(
+ "Failed to fsync index writer for partition {}: {}",
+ partition_id,
+ e
+ );
+ return Err(e);
}
Ok(())
diff --git a/core/server/src/slab/topics.rs b/core/server/src/slab/topics.rs
index d93df224..d66880a9 100644
--- a/core/server/src/slab/topics.rs
+++ b/core/server/src/slab/topics.rs
@@ -15,7 +15,7 @@ use crate::{
},
},
streaming::{
- stats::stats::TopicStats,
+ stats::TopicStats,
topics::{
consumer_group2::{ConsumerGroupRef, ConsumerGroupRefMut},
topic2::{self, TopicRef, TopicRefMut},
@@ -196,7 +196,7 @@ impl Topics {
f: impl FnOnce(ComponentsById<TopicRef>) -> T,
) -> T {
let id = self.get_index(topic_id);
- self.with_components_by_id(id, |components| f(components))
+ self.with_components_by_id(id, f)
}
pub fn with_topic_by_id_mut<T>(
@@ -205,7 +205,7 @@ impl Topics {
f: impl FnOnce(ComponentsById<TopicRefMut>) -> T,
) -> T {
let id = self.get_index(topic_id);
- self.with_components_by_id_mut(id, |components| f(components))
+ self.with_components_by_id_mut(id, f)
}
pub fn with_consumer_groups<T>(
diff --git a/core/server/src/state/file.rs b/core/server/src/state/file.rs
index 3c4be921..54a59469 100644
--- a/core/server/src/state/file.rs
+++ b/core/server/src/state/file.rs
@@ -35,7 +35,7 @@ use std::sync::Arc;
use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
use tracing::{debug, error, info};
-pub const BUF_cursor_CAPACITY_BYTES: usize = 512 * 1000;
+pub const BUF_CURSOR_CAPACITY_BYTES: usize = 512 * 1000;
const FILE_STATE_PARSE_ERROR: &str = "STATE - failed to parse file state";
#[derive(Debug)]
diff --git a/core/server/src/state/system.rs b/core/server/src/state/system.rs
index b5535d0e..31275f56 100644
--- a/core/server/src/state/system.rs
+++ b/core/server/src/state/system.rs
@@ -106,26 +106,15 @@ impl SystemState {
// Create root user if does not exist.
let root_exists = state_entries
.iter()
- .find(|entry| {
+ .any(|entry| {
entry
.command()
- .and_then(|command| match command {
- EntryCommand::CreateUser(payload)
- if payload.user_id == DEFAULT_ROOT_USER_ID =>
- {
- Ok(true)
- }
- _ => Ok(false),
+ .map(|command| matches!(command,
EntryCommand::CreateUser(payload) if payload.user_id == DEFAULT_ROOT_USER_ID))
+ .unwrap_or_else(|err| {
+ error!("Failed to check if root user exists: {err}");
+ false
})
- .map_or_else(
- |err| {
- error!("Failed to check if root user exists:
{err}");
- false
- },
- |v| v,
- )
- })
- .is_some();
+ });
if !root_exists {
info!("No users found, creating the root user...");
diff --git a/core/server/src/streaming/deduplication/message_deduplicator.rs
b/core/server/src/streaming/deduplication/message_deduplicator.rs
index b5a01f32..96b55a74 100644
--- a/core/server/src/streaming/deduplication/message_deduplicator.rs
+++ b/core/server/src/streaming/deduplication/message_deduplicator.rs
@@ -35,11 +35,6 @@ impl Clone for MessageDeduplicator {
let builder = Self::setup_cache_builder(builder, self.max_entries,
self.ttl);
let cache = builder.build();
- self.cache.clone();
- // Transfer data from the original cache to the new one
- for (key, value) in self.cache.iter() {
- cache.insert(*key, value);
- }
Self {
ttl: self.ttl,
max_entries: self.max_entries,
diff --git a/core/server/src/streaming/partitions/consumer_offset.rs
b/core/server/src/streaming/partitions/consumer_offset.rs
index ac51b9da..49d30a94 100644
--- a/core/server/src/streaming/partitions/consumer_offset.rs
+++ b/core/server/src/streaming/partitions/consumer_offset.rs
@@ -13,8 +13,8 @@ pub struct ConsumerOffset {
impl Clone for ConsumerOffset {
fn clone(&self) -> Self {
Self {
- kind: self.kind.clone(),
- consumer_id: self.consumer_id.clone(),
+ kind: self.kind,
+ consumer_id: self.consumer_id,
offset: AtomicU64::new(0),
path: self.path.clone(),
}
diff --git a/core/server/src/streaming/partitions/helpers.rs
b/core/server/src/streaming/partitions/helpers.rs
index d0cff51b..9cdd6b2f 100644
--- a/core/server/src/streaming/partitions/helpers.rs
+++ b/core/server/src/streaming/partitions/helpers.rs
@@ -290,7 +290,6 @@ pub fn get_segment_range_by_timestamp(
timestamp: u64,
) -> impl FnOnce(ComponentsById<PartitionRef>) ->
Result<std::ops::Range<usize>, IggyError> {
move |(.., log)| -> Result<std::ops::Range<usize>, IggyError> {
- let segments = log.segments();
let start = log
.segments()
.iter()
@@ -513,7 +512,6 @@ pub fn persist_batch(
pub fn update_index_and_increment_stats(
saved: IggyByteSize,
- batch_count: u32,
config: &SystemConfig,
) -> impl FnOnce(ComponentsById<PartitionRefMut>) {
move |(.., log)| {
diff --git a/core/server/src/streaming/partitions/journal.rs
b/core/server/src/streaming/partitions/journal.rs
index 54dfa225..bcfbf111 100644
--- a/core/server/src/streaming/partitions/journal.rs
+++ b/core/server/src/streaming/partitions/journal.rs
@@ -119,5 +119,5 @@ pub trait Journal {
// This could be merged together with `append`, but not doing this for two
reasons.
// 1. In case of the `Journal` being used as part of structure that
utilizes interior mutability, async with borrow_mut is not possible.
// 2. Having it as separate function allows for more optimal usage
patterns, e.g. batching multiple appends before flushing.
- async fn flush(&self) -> Result<(), IggyError>;
+ fn flush(&self) -> impl Future<Output = Result<(), IggyError>>;
}
diff --git a/core/server/src/streaming/partitions/log.rs
b/core/server/src/streaming/partitions/log.rs
index 922f929b..e8fb56d5 100644
--- a/core/server/src/streaming/partitions/log.rs
+++ b/core/server/src/streaming/partitions/log.rs
@@ -17,8 +17,8 @@ where
journal: J,
// Ring buffer tracking recently accessed segment indices for cleanup
optimization.
// A background task uses this to identify and close file descriptors for
unused segments.
- access_map: AllocRingBuffer<usize>,
- cache: (),
+ _access_map: AllocRingBuffer<usize>,
+ _cache: (),
segments: Vec<Segment2>,
indexes: Vec<Option<IggyIndexesMut>>,
storage: Vec<Storage>,
@@ -31,8 +31,8 @@ where
fn default() -> Self {
Self {
journal: J::default(),
- access_map:
AllocRingBuffer::with_capacity_power_of_2(ACCESS_MAP_CAPACITY),
- cache: (),
+ _access_map:
AllocRingBuffer::with_capacity_power_of_2(ACCESS_MAP_CAPACITY),
+ _cache: (),
segments: Vec::with_capacity(SEGMENTS_CAPACITY),
storage: Vec::with_capacity(SEGMENTS_CAPACITY),
indexes: Vec::with_capacity(SEGMENTS_CAPACITY),
diff --git a/core/server/src/streaming/partitions/partition2.rs
b/core/server/src/streaming/partitions/partition2.rs
index d635e863..087d23b0 100644
--- a/core/server/src/streaming/partitions/partition2.rs
+++ b/core/server/src/streaming/partitions/partition2.rs
@@ -12,7 +12,7 @@ use crate::{
consumer_offset, helpers::create_message_deduplicator,
journal::MemoryMessageJournal,
log::SegmentedLog,
},
- stats::stats::{PartitionStats, TopicStats},
+ stats::{PartitionStats, TopicStats},
},
};
use iggy_common::{Identifier, IggyTimestamp};
@@ -95,6 +95,7 @@ pub struct Partition {
}
impl Partition {
+ #[allow(clippy::too_many_arguments)]
pub fn new(
created_at: IggyTimestamp,
should_increment_offset: bool,
diff --git a/core/server/src/streaming/partitions/storage2.rs
b/core/server/src/streaming/partitions/storage2.rs
index 4977121d..63e5cb65 100644
--- a/core/server/src/streaming/partitions/storage2.rs
+++ b/core/server/src/streaming/partitions/storage2.rs
@@ -4,7 +4,7 @@ use crate::{
io::fs_utils::remove_dir_all,
shard_error, shard_info, shard_trace,
streaming::partitions::{
- consumer_offset::ConsumerOffset, journal::MemoryMessageJournal,
log::SegmentedLog,
+ consumer_offset::ConsumerOffset,
},
};
use compio::{
@@ -109,11 +109,8 @@ pub async fn delete_partitions_from_disk(
stream_id: usize,
topic_id: usize,
partition_id: usize,
- log: &mut SegmentedLog<MemoryMessageJournal>,
config: &SystemConfig,
) -> Result<(), IggyError> {
- //TODO:
- //log.close().await;
let partition_path = config.get_partition_path(stream_id, topic_id,
partition_id);
remove_dir_all(&partition_path).await.map_err(|_| {
@@ -172,14 +169,14 @@ pub fn load_consumer_offsets(
kind: ConsumerKind,
) -> Result<Vec<ConsumerOffset>, IggyError> {
trace!("Loading consumer offsets from path: {path}...");
- let dir_entries = std::fs::read_dir(&path);
+ let dir_entries = std::fs::read_dir(path);
if dir_entries.is_err() {
return Err(IggyError::CannotReadConsumerOffsets(path.to_owned()));
}
let mut consumer_offsets = Vec::new();
- let mut dir_entries = dir_entries.unwrap();
- while let Some(dir_entry) = dir_entries.next() {
+ let dir_entries = dir_entries.unwrap();
+ for dir_entry in dir_entries {
let dir_entry = dir_entry.unwrap();
let metadata = dir_entry.metadata();
if metadata.is_err() {
diff --git a/core/server/src/streaming/persistence/mod.rs
b/core/server/src/streaming/persistence/mod.rs
index 97d3cba3..df7333ba 100644
--- a/core/server/src/streaming/persistence/mod.rs
+++ b/core/server/src/streaming/persistence/mod.rs
@@ -17,6 +17,5 @@
*/
pub mod persister;
-pub mod task;
pub const COMPONENT: &str = "STREAMING_PERSISTENCE";
diff --git a/core/server/src/streaming/persistence/task.rs
b/core/server/src/streaming/persistence/task.rs
deleted file mode 100644
index 1b359ef7..00000000
--- a/core/server/src/streaming/persistence/task.rs
+++ /dev/null
@@ -1,135 +0,0 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-use crate::streaming::persistence::COMPONENT;
-use bytes::Bytes;
-use compio::runtime::Task;
-use error_set::ErrContext;
-use flume::{Receiver, Sender, unbounded};
-use iggy_common::IggyError;
-use std::{any::Any, sync::Arc, time::Duration};
-use tracing::error;
-
-use super::persister::PersisterKind;
-
-pub struct LogPersisterTask {
- _sender: Option<Sender<Bytes>>,
- _task_handle: Option<Task<Result<(), Box<dyn Any + Send>>>>,
-}
-
-impl std::fmt::Debug for LogPersisterTask {
- fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
- f.debug_struct("LogPersisterTask")
- .field("_sender", &self._sender.is_some())
- .field("_task_handle", &self._task_handle.is_some())
- .finish()
- }
-}
-
-impl LogPersisterTask {
- pub fn new(
- path: String,
- persister: Arc<PersisterKind>,
- max_retries: u32,
- retry_sleep: Duration,
- ) -> Self {
- let (sender, receiver): (Sender<Bytes>, Receiver<Bytes>) = unbounded();
-
- let task_handle = compio::runtime::spawn(async move {
- loop {
- match receiver.recv_async().await {
- Ok(data) => {
- if let Err(error) = Self::persist_with_retries(
- &path,
- &persister,
- data,
- max_retries,
- retry_sleep,
- )
- .await
- {
- error!("{COMPONENT} (error: {error}) - Final
failure to persist data.");
- }
- }
- Err(error) => {
- error!("{COMPONENT} (error: {error}) - Error receiving
data from channel.");
- return;
- }
- }
- }
- });
-
- LogPersisterTask {
- _sender: Some(sender),
- _task_handle: Some(task_handle),
- }
- }
-
- async fn persist_with_retries(
- path: &str,
- persister: &Arc<PersisterKind>,
- data: Bytes,
- max_retries: u32,
- retry_sleep: Duration,
- ) -> Result<(), String> {
- let mut retries = 0;
-
- while retries < max_retries {
- match persister.append(path, data.clone()).await {
- Ok(_) => return Ok(()),
- Err(e) => {
- error!(
- "Could not append to persister (attempt {}): {}",
- retries + 1,
- e
- );
- retries += 1;
- tokio::time::sleep(retry_sleep).await;
- }
- }
- }
-
- Err(format!(
- "{COMPONENT} - failed to persist data after {max_retries} retries",
- ))
- }
-
- pub async fn send(&self, data: Bytes) -> Result<(), IggyError> {
- if let Some(sender) = &self._sender {
- sender
- .send_async(data)
- .await
- .with_error_context(|error| {
- format!("{COMPONENT} (error: {error}) - failed to send
data to async channel")
- })
- .map_err(|_| IggyError::CannotSaveMessagesToSegment)
- } else {
- Err(IggyError::CannotSaveMessagesToSegment)
- }
- }
-}
-
-impl Drop for LogPersisterTask {
- fn drop(&mut self) {
- self._sender.take();
-
- if let Some(handle) = self._task_handle.take() {
- compio::runtime::spawn(async move { handle.await });
- }
- }
-}
diff --git a/core/server/src/streaming/segments/indexes/index_reader.rs
b/core/server/src/streaming/segments/indexes/index_reader.rs
index ff9800d7..33528c62 100644
--- a/core/server/src/streaming/segments/indexes/index_reader.rs
+++ b/core/server/src/streaming/segments/indexes/index_reader.rs
@@ -339,7 +339,7 @@ impl IndexReader {
) -> Result<PooledBuffer, std::io::Error> {
if use_pool {
let len = len as usize;
- let buf = PooledBuffer::with_capacity(len as usize);
+ let buf = PooledBuffer::with_capacity(len);
let (result, buf) = self
.file
.read_exact_at(buf.slice(..len), offset as u64)
diff --git a/core/server/src/streaming/stats/mod.rs
b/core/server/src/streaming/stats/mod.rs
index 9d34677f..67c464f3 100644
--- a/core/server/src/streaming/stats/mod.rs
+++ b/core/server/src/streaming/stats/mod.rs
@@ -1 +1,331 @@
-pub mod stats;
+use std::sync::{
+ Arc,
+ atomic::{AtomicU32, AtomicU64, Ordering},
+};
+
+#[derive(Default, Debug)]
+pub struct StreamStats {
+ size_bytes: AtomicU64,
+ messages_count: AtomicU64,
+ segments_count: AtomicU32,
+}
+
+impl StreamStats {
+ pub fn increment_size_bytes(&self, size_bytes: u64) {
+ self.size_bytes.fetch_add(size_bytes, Ordering::AcqRel);
+ }
+
+ pub fn increment_messages_count(&self, messages_count: u64) {
+ self.messages_count
+ .fetch_add(messages_count, Ordering::AcqRel);
+ }
+
+ pub fn increment_segments_count(&self, segments_count: u32) {
+ self.segments_count
+ .fetch_add(segments_count, Ordering::AcqRel);
+ }
+
+ pub fn decrement_size_bytes(&self, size_bytes: u64) {
+ self.size_bytes.fetch_sub(size_bytes, Ordering::AcqRel);
+ }
+
+ pub fn decrement_messages_count(&self, messages_count: u64) {
+ self.messages_count
+ .fetch_sub(messages_count, Ordering::AcqRel);
+ }
+
+ pub fn decrement_segments_count(&self, segments_count: u32) {
+ self.segments_count
+ .fetch_sub(segments_count, Ordering::AcqRel);
+ }
+
+ pub fn size_bytes_inconsistent(&self) -> u64 {
+ self.size_bytes.load(Ordering::Relaxed)
+ }
+
+ pub fn messages_count_inconsistent(&self) -> u64 {
+ self.messages_count.load(Ordering::Relaxed)
+ }
+
+ pub fn segments_count_inconsistent(&self) -> u32 {
+ self.segments_count.load(Ordering::Relaxed)
+ }
+
+ pub fn zero_out_size_bytes(&self) {
+ self.size_bytes.store(0, Ordering::Relaxed);
+ }
+
+ pub fn zero_out_messages_count(&self) {
+ self.messages_count.store(0, Ordering::Relaxed);
+ }
+
+ pub fn zero_out_segments_count(&self) {
+ self.segments_count.store(0, Ordering::Relaxed);
+ }
+
+ pub fn zero_out_all(&self) {
+ self.zero_out_size_bytes();
+ self.zero_out_messages_count();
+ self.zero_out_segments_count();
+ }
+}
+
+#[derive(Default, Debug)]
+pub struct TopicStats {
+ parent: Arc<StreamStats>,
+ size_bytes: AtomicU64,
+ messages_count: AtomicU64,
+ segments_count: AtomicU32,
+}
+
+impl TopicStats {
+ pub fn new(parent: Arc<StreamStats>) -> Self {
+ Self {
+ parent,
+ size_bytes: AtomicU64::new(0),
+ messages_count: AtomicU64::new(0),
+ segments_count: AtomicU32::new(0),
+ }
+ }
+
+ pub fn parent(&self) -> Arc<StreamStats> {
+ self.parent.clone()
+ }
+
+ pub fn increment_parent_size_bytes(&self, size_bytes: u64) {
+ self.parent.increment_size_bytes(size_bytes);
+ }
+
+ pub fn increment_parent_messages_count(&self, messages_count: u64) {
+ self.parent.increment_messages_count(messages_count);
+ }
+
+ pub fn increment_parent_segments_count(&self, segments_count: u32) {
+ self.parent.increment_segments_count(segments_count);
+ }
+
+ pub fn increment_size_bytes(&self, size_bytes: u64) {
+ self.size_bytes.fetch_add(size_bytes, Ordering::AcqRel);
+ self.increment_parent_size_bytes(size_bytes);
+ }
+
+ pub fn increment_messages_count(&self, messages_count: u64) {
+ self.messages_count
+ .fetch_add(messages_count, Ordering::AcqRel);
+ self.increment_parent_messages_count(messages_count);
+ }
+
+ pub fn increment_segments_count(&self, segments_count: u32) {
+ self.segments_count
+ .fetch_add(segments_count, Ordering::AcqRel);
+ self.increment_parent_segments_count(segments_count);
+ }
+
+ pub fn decrement_parent_size_bytes(&self, size_bytes: u64) {
+ self.parent.decrement_size_bytes(size_bytes);
+ }
+
+ pub fn decrement_parent_messages_count(&self, messages_count: u64) {
+ self.parent.decrement_messages_count(messages_count);
+ }
+
+ pub fn decrement_parent_segments_count(&self, segments_count: u32) {
+ self.parent.decrement_segments_count(segments_count);
+ }
+
+ pub fn decrement_size_bytes(&self, size_bytes: u64) {
+ self.size_bytes.fetch_sub(size_bytes, Ordering::AcqRel);
+ self.decrement_parent_size_bytes(size_bytes);
+ }
+
+ pub fn decrement_messages_count(&self, messages_count: u64) {
+ self.messages_count
+ .fetch_sub(messages_count, Ordering::AcqRel);
+ self.decrement_parent_messages_count(messages_count);
+ }
+
+ pub fn decrement_segments_count(&self, segments_count: u32) {
+ self.segments_count
+ .fetch_sub(segments_count, Ordering::AcqRel);
+ self.decrement_parent_segments_count(segments_count);
+ }
+
+ pub fn size_bytes_inconsistent(&self) -> u64 {
+ self.size_bytes.load(Ordering::Relaxed)
+ }
+
+ pub fn messages_count_inconsistent(&self) -> u64 {
+ self.messages_count.load(Ordering::Relaxed)
+ }
+
+ pub fn segments_count_inconsistent(&self) -> u32 {
+ self.segments_count.load(Ordering::Relaxed)
+ }
+
+ pub fn zero_out_parent_size_bytes(&self) {
+ self.parent.zero_out_size_bytes();
+ }
+
+ pub fn zero_out_parent_messages_count(&self) {
+ self.parent.zero_out_messages_count();
+ }
+
+ pub fn zero_out_parent_segments_count(&self) {
+ self.parent.zero_out_segments_count();
+ }
+
+ pub fn zero_out_parent_all(&self) {
+ self.parent.zero_out_all();
+ }
+
+ pub fn zero_out_size_bytes(&self) {
+ self.size_bytes.store(0, Ordering::Relaxed);
+ self.zero_out_parent_size_bytes();
+ }
+
+ pub fn zero_out_messages_count(&self) {
+ self.messages_count.store(0, Ordering::Relaxed);
+ self.zero_out_parent_messages_count();
+ }
+
+ pub fn zero_out_segments_count(&self) {
+ self.segments_count.store(0, Ordering::Relaxed);
+ self.zero_out_parent_segments_count();
+ }
+
+ pub fn zero_out_all(&self) {
+ self.zero_out_size_bytes();
+ self.zero_out_messages_count();
+ self.zero_out_segments_count();
+ }
+}
+
+#[derive(Default, Debug)]
+pub struct PartitionStats {
+ parent: Arc<TopicStats>,
+ messages_count: AtomicU64,
+ size_bytes: AtomicU64,
+ segments_count: AtomicU32,
+}
+
+impl PartitionStats {
+ pub fn new(parent_stats: Arc<TopicStats>) -> Self {
+ Self {
+ parent: parent_stats,
+ messages_count: AtomicU64::new(0),
+ size_bytes: AtomicU64::new(0),
+ segments_count: AtomicU32::new(0),
+ }
+ }
+
+ pub fn parent(&self) -> Arc<TopicStats> {
+ self.parent.clone()
+ }
+
+ pub fn increment_size_bytes(&self, size_bytes: u64) {
+ self.size_bytes.fetch_add(size_bytes, Ordering::AcqRel);
+ self.increment_parent_size_bytes(size_bytes);
+ }
+
+ pub fn increment_messages_count(&self, messages_count: u64) {
+ self.messages_count
+ .fetch_add(messages_count, Ordering::AcqRel);
+ self.increment_parent_messages_count(messages_count);
+ }
+
+ pub fn increment_segments_count(&self, segments_count: u32) {
+ self.segments_count
+ .fetch_add(segments_count, Ordering::AcqRel);
+ self.increment_parent_segments_count(segments_count);
+ }
+
+ pub fn increment_parent_size_bytes(&self, size_bytes: u64) {
+ self.parent.increment_size_bytes(size_bytes);
+ }
+
+ pub fn increment_parent_messages_count(&self, messages_count: u64) {
+ self.parent.increment_messages_count(messages_count);
+ }
+
+ pub fn increment_parent_segments_count(&self, segments_count: u32) {
+ self.parent.increment_segments_count(segments_count);
+ }
+
+ pub fn decrement_size_bytes(&self, size_bytes: u64) {
+ self.size_bytes.fetch_sub(size_bytes, Ordering::AcqRel);
+ self.decrement_parent_size_bytes(size_bytes);
+ }
+
+ pub fn decrement_messages_count(&self, messages_count: u64) {
+ self.messages_count
+ .fetch_sub(messages_count, Ordering::AcqRel);
+ self.decrement_parent_messages_count(messages_count);
+ }
+
+ pub fn decrement_segments_count(&self, segments_count: u32) {
+ self.segments_count
+ .fetch_sub(segments_count, Ordering::AcqRel);
+ self.decrement_parent_segments_count(segments_count);
+ }
+
+ pub fn decrement_parent_size_bytes(&self, size_bytes: u64) {
+ self.parent.decrement_size_bytes(size_bytes);
+ }
+
+ pub fn decrement_parent_messages_count(&self, messages_count: u64) {
+ self.parent.decrement_messages_count(messages_count);
+ }
+
+ pub fn decrement_parent_segments_count(&self, segments_count: u32) {
+ self.parent.decrement_segments_count(segments_count);
+ }
+
+ pub fn size_bytes_inconsistent(&self) -> u64 {
+ self.size_bytes.load(Ordering::Relaxed)
+ }
+
+ pub fn messages_count_inconsistent(&self) -> u64 {
+ self.messages_count.load(Ordering::Relaxed)
+ }
+
+ pub fn segments_count_inconsistent(&self) -> u32 {
+ self.segments_count.load(Ordering::Relaxed)
+ }
+
+ pub fn zero_out_parent_size_bytes(&self) {
+ self.parent.zero_out_size_bytes();
+ }
+
+ pub fn zero_out_parent_messages_count(&self) {
+ self.parent.zero_out_messages_count();
+ }
+
+ pub fn zero_out_parent_segments_count(&self) {
+ self.parent.zero_out_segments_count();
+ }
+
+ pub fn zero_out_parent_all(&self) {
+ self.parent.zero_out_all();
+ }
+
+ pub fn zero_out_size_bytes(&self) {
+ self.size_bytes.store(0, Ordering::Relaxed);
+ self.zero_out_parent_size_bytes();
+ }
+
+ pub fn zero_out_messages_count(&self) {
+ self.messages_count.store(0, Ordering::Relaxed);
+ self.zero_out_parent_messages_count();
+ }
+
+ pub fn zero_out_segments_count(&self) {
+ self.segments_count.store(0, Ordering::Relaxed);
+ self.zero_out_parent_segments_count();
+ }
+
+ pub fn zero_out_all(&self) {
+ self.zero_out_size_bytes();
+ self.zero_out_messages_count();
+ self.zero_out_segments_count();
+ }
+}
diff --git a/core/server/src/streaming/stats/stats.rs
b/core/server/src/streaming/stats/stats.rs
deleted file mode 100644
index 67c464f3..00000000
--- a/core/server/src/streaming/stats/stats.rs
+++ /dev/null
@@ -1,331 +0,0 @@
-use std::sync::{
- Arc,
- atomic::{AtomicU32, AtomicU64, Ordering},
-};
-
-#[derive(Default, Debug)]
-pub struct StreamStats {
- size_bytes: AtomicU64,
- messages_count: AtomicU64,
- segments_count: AtomicU32,
-}
-
-impl StreamStats {
- pub fn increment_size_bytes(&self, size_bytes: u64) {
- self.size_bytes.fetch_add(size_bytes, Ordering::AcqRel);
- }
-
- pub fn increment_messages_count(&self, messages_count: u64) {
- self.messages_count
- .fetch_add(messages_count, Ordering::AcqRel);
- }
-
- pub fn increment_segments_count(&self, segments_count: u32) {
- self.segments_count
- .fetch_add(segments_count, Ordering::AcqRel);
- }
-
- pub fn decrement_size_bytes(&self, size_bytes: u64) {
- self.size_bytes.fetch_sub(size_bytes, Ordering::AcqRel);
- }
-
- pub fn decrement_messages_count(&self, messages_count: u64) {
- self.messages_count
- .fetch_sub(messages_count, Ordering::AcqRel);
- }
-
- pub fn decrement_segments_count(&self, segments_count: u32) {
- self.segments_count
- .fetch_sub(segments_count, Ordering::AcqRel);
- }
-
- pub fn size_bytes_inconsistent(&self) -> u64 {
- self.size_bytes.load(Ordering::Relaxed)
- }
-
- pub fn messages_count_inconsistent(&self) -> u64 {
- self.messages_count.load(Ordering::Relaxed)
- }
-
- pub fn segments_count_inconsistent(&self) -> u32 {
- self.segments_count.load(Ordering::Relaxed)
- }
-
- pub fn zero_out_size_bytes(&self) {
- self.size_bytes.store(0, Ordering::Relaxed);
- }
-
- pub fn zero_out_messages_count(&self) {
- self.messages_count.store(0, Ordering::Relaxed);
- }
-
- pub fn zero_out_segments_count(&self) {
- self.segments_count.store(0, Ordering::Relaxed);
- }
-
- pub fn zero_out_all(&self) {
- self.zero_out_size_bytes();
- self.zero_out_messages_count();
- self.zero_out_segments_count();
- }
-}
-
-#[derive(Default, Debug)]
-pub struct TopicStats {
- parent: Arc<StreamStats>,
- size_bytes: AtomicU64,
- messages_count: AtomicU64,
- segments_count: AtomicU32,
-}
-
-impl TopicStats {
- pub fn new(parent: Arc<StreamStats>) -> Self {
- Self {
- parent,
- size_bytes: AtomicU64::new(0),
- messages_count: AtomicU64::new(0),
- segments_count: AtomicU32::new(0),
- }
- }
-
- pub fn parent(&self) -> Arc<StreamStats> {
- self.parent.clone()
- }
-
- pub fn increment_parent_size_bytes(&self, size_bytes: u64) {
- self.parent.increment_size_bytes(size_bytes);
- }
-
- pub fn increment_parent_messages_count(&self, messages_count: u64) {
- self.parent.increment_messages_count(messages_count);
- }
-
- pub fn increment_parent_segments_count(&self, segments_count: u32) {
- self.parent.increment_segments_count(segments_count);
- }
-
- pub fn increment_size_bytes(&self, size_bytes: u64) {
- self.size_bytes.fetch_add(size_bytes, Ordering::AcqRel);
- self.increment_parent_size_bytes(size_bytes);
- }
-
- pub fn increment_messages_count(&self, messages_count: u64) {
- self.messages_count
- .fetch_add(messages_count, Ordering::AcqRel);
- self.increment_parent_messages_count(messages_count);
- }
-
- pub fn increment_segments_count(&self, segments_count: u32) {
- self.segments_count
- .fetch_add(segments_count, Ordering::AcqRel);
- self.increment_parent_segments_count(segments_count);
- }
-
- pub fn decrement_parent_size_bytes(&self, size_bytes: u64) {
- self.parent.decrement_size_bytes(size_bytes);
- }
-
- pub fn decrement_parent_messages_count(&self, messages_count: u64) {
- self.parent.decrement_messages_count(messages_count);
- }
-
- pub fn decrement_parent_segments_count(&self, segments_count: u32) {
- self.parent.decrement_segments_count(segments_count);
- }
-
- pub fn decrement_size_bytes(&self, size_bytes: u64) {
- self.size_bytes.fetch_sub(size_bytes, Ordering::AcqRel);
- self.decrement_parent_size_bytes(size_bytes);
- }
-
- pub fn decrement_messages_count(&self, messages_count: u64) {
- self.messages_count
- .fetch_sub(messages_count, Ordering::AcqRel);
- self.decrement_parent_messages_count(messages_count);
- }
-
- pub fn decrement_segments_count(&self, segments_count: u32) {
- self.segments_count
- .fetch_sub(segments_count, Ordering::AcqRel);
- self.decrement_parent_segments_count(segments_count);
- }
-
- pub fn size_bytes_inconsistent(&self) -> u64 {
- self.size_bytes.load(Ordering::Relaxed)
- }
-
- pub fn messages_count_inconsistent(&self) -> u64 {
- self.messages_count.load(Ordering::Relaxed)
- }
-
- pub fn segments_count_inconsistent(&self) -> u32 {
- self.segments_count.load(Ordering::Relaxed)
- }
-
- pub fn zero_out_parent_size_bytes(&self) {
- self.parent.zero_out_size_bytes();
- }
-
- pub fn zero_out_parent_messages_count(&self) {
- self.parent.zero_out_messages_count();
- }
-
- pub fn zero_out_parent_segments_count(&self) {
- self.parent.zero_out_segments_count();
- }
-
- pub fn zero_out_parent_all(&self) {
- self.parent.zero_out_all();
- }
-
- pub fn zero_out_size_bytes(&self) {
- self.size_bytes.store(0, Ordering::Relaxed);
- self.zero_out_parent_size_bytes();
- }
-
- pub fn zero_out_messages_count(&self) {
- self.messages_count.store(0, Ordering::Relaxed);
- self.zero_out_parent_messages_count();
- }
-
- pub fn zero_out_segments_count(&self) {
- self.segments_count.store(0, Ordering::Relaxed);
- self.zero_out_parent_segments_count();
- }
-
- pub fn zero_out_all(&self) {
- self.zero_out_size_bytes();
- self.zero_out_messages_count();
- self.zero_out_segments_count();
- }
-}
-
-#[derive(Default, Debug)]
-pub struct PartitionStats {
- parent: Arc<TopicStats>,
- messages_count: AtomicU64,
- size_bytes: AtomicU64,
- segments_count: AtomicU32,
-}
-
-impl PartitionStats {
- pub fn new(parent_stats: Arc<TopicStats>) -> Self {
- Self {
- parent: parent_stats,
- messages_count: AtomicU64::new(0),
- size_bytes: AtomicU64::new(0),
- segments_count: AtomicU32::new(0),
- }
- }
-
- pub fn parent(&self) -> Arc<TopicStats> {
- self.parent.clone()
- }
-
- pub fn increment_size_bytes(&self, size_bytes: u64) {
- self.size_bytes.fetch_add(size_bytes, Ordering::AcqRel);
- self.increment_parent_size_bytes(size_bytes);
- }
-
- pub fn increment_messages_count(&self, messages_count: u64) {
- self.messages_count
- .fetch_add(messages_count, Ordering::AcqRel);
- self.increment_parent_messages_count(messages_count);
- }
-
- pub fn increment_segments_count(&self, segments_count: u32) {
- self.segments_count
- .fetch_add(segments_count, Ordering::AcqRel);
- self.increment_parent_segments_count(segments_count);
- }
-
- pub fn increment_parent_size_bytes(&self, size_bytes: u64) {
- self.parent.increment_size_bytes(size_bytes);
- }
-
- pub fn increment_parent_messages_count(&self, messages_count: u64) {
- self.parent.increment_messages_count(messages_count);
- }
-
- pub fn increment_parent_segments_count(&self, segments_count: u32) {
- self.parent.increment_segments_count(segments_count);
- }
-
- pub fn decrement_size_bytes(&self, size_bytes: u64) {
- self.size_bytes.fetch_sub(size_bytes, Ordering::AcqRel);
- self.decrement_parent_size_bytes(size_bytes);
- }
-
- pub fn decrement_messages_count(&self, messages_count: u64) {
- self.messages_count
- .fetch_sub(messages_count, Ordering::AcqRel);
- self.decrement_parent_messages_count(messages_count);
- }
-
- pub fn decrement_segments_count(&self, segments_count: u32) {
- self.segments_count
- .fetch_sub(segments_count, Ordering::AcqRel);
- self.decrement_parent_segments_count(segments_count);
- }
-
- pub fn decrement_parent_size_bytes(&self, size_bytes: u64) {
- self.parent.decrement_size_bytes(size_bytes);
- }
-
- pub fn decrement_parent_messages_count(&self, messages_count: u64) {
- self.parent.decrement_messages_count(messages_count);
- }
-
- pub fn decrement_parent_segments_count(&self, segments_count: u32) {
- self.parent.decrement_segments_count(segments_count);
- }
-
- pub fn size_bytes_inconsistent(&self) -> u64 {
- self.size_bytes.load(Ordering::Relaxed)
- }
-
- pub fn messages_count_inconsistent(&self) -> u64 {
- self.messages_count.load(Ordering::Relaxed)
- }
-
- pub fn segments_count_inconsistent(&self) -> u32 {
- self.segments_count.load(Ordering::Relaxed)
- }
-
- pub fn zero_out_parent_size_bytes(&self) {
- self.parent.zero_out_size_bytes();
- }
-
- pub fn zero_out_parent_messages_count(&self) {
- self.parent.zero_out_messages_count();
- }
-
- pub fn zero_out_parent_segments_count(&self) {
- self.parent.zero_out_segments_count();
- }
-
- pub fn zero_out_parent_all(&self) {
- self.parent.zero_out_all();
- }
-
- pub fn zero_out_size_bytes(&self) {
- self.size_bytes.store(0, Ordering::Relaxed);
- self.zero_out_parent_size_bytes();
- }
-
- pub fn zero_out_messages_count(&self) {
- self.messages_count.store(0, Ordering::Relaxed);
- self.zero_out_parent_messages_count();
- }
-
- pub fn zero_out_segments_count(&self) {
- self.segments_count.store(0, Ordering::Relaxed);
- self.zero_out_parent_segments_count();
- }
-
- pub fn zero_out_all(&self) {
- self.zero_out_size_bytes();
- self.zero_out_messages_count();
- self.zero_out_segments_count();
- }
-}
diff --git a/core/server/src/streaming/streams/stream2.rs
b/core/server/src/streaming/streams/stream2.rs
index d637e29d..f890d96d 100644
--- a/core/server/src/streaming/streams/stream2.rs
+++ b/core/server/src/streaming/streams/stream2.rs
@@ -12,7 +12,7 @@ use crate::{
topics::Topics,
traits_ext::{EntityMarker, InsertCell, IntoComponents,
IntoComponentsById},
},
- streaming::stats::stats::StreamStats,
+ streaming::stats::StreamStats,
};
#[derive(Debug, Clone)]
diff --git a/core/server/src/streaming/topics/consumer_group2.rs
b/core/server/src/streaming/topics/consumer_group2.rs
index 0236d3da..ca58d55f 100644
--- a/core/server/src/streaming/topics/consumer_group2.rs
+++ b/core/server/src/streaming/topics/consumer_group2.rs
@@ -196,8 +196,8 @@ pub struct Member {
impl Clone for Member {
fn clone(&self) -> Self {
Self {
- id: self.id.clone(),
- client_id: self.client_id.clone(),
+ id: self.id,
+ client_id: self.client_id,
partitions: self.partitions.clone(),
current_partition_idx: AtomicUsize::new(0),
}
diff --git a/core/server/src/streaming/topics/helpers.rs
b/core/server/src/streaming/topics/helpers.rs
index 8b925a0d..976ffad9 100644
--- a/core/server/src/streaming/topics/helpers.rs
+++ b/core/server/src/streaming/topics/helpers.rs
@@ -5,12 +5,11 @@ use crate::{
slab::{
Keyed,
consumer_groups::{self, ConsumerGroups},
- partitions::{self},
topics::{self, Topics},
traits_ext::{ComponentsById, Delete, DeleteCell, EntityMarker},
},
streaming::{
- stats::stats::TopicStats,
+ stats::TopicStats,
topics::{
consumer_group2::{
self, ConsumerGroupMembers, ConsumerGroupRef,
ConsumerGroupRefMut, Member,
@@ -183,13 +182,13 @@ fn add_member(
shard_id: u16,
id: usize,
members: &mut ConsumerGroupMembers,
- partitions: &Vec<usize>,
+ partitions: &[usize],
client_id: u32,
) {
members.inner_mut().rcu(move |members| {
let mut members = mimic_members(members);
Member::new(client_id).insert_into(&mut members);
- assign_partitions_to_members(shard_id, id, &mut members,
partitions.clone());
+ assign_partitions_to_members(shard_id, id, &mut members,
partitions.to_vec());
members
});
}
@@ -199,7 +198,7 @@ fn delete_member(
id: usize,
client_id: u32,
members: &mut ConsumerGroupMembers,
- partitions: &Vec<usize>,
+ partitions: &[usize],
) {
let member_id = members
.inner()
@@ -214,7 +213,7 @@ fn delete_member(
entry.id = idx;
true
});
- assign_partitions_to_members(shard_id, id, &mut members,
partitions.clone());
+ assign_partitions_to_members(shard_id, id, &mut members,
partitions.to_vec());
members
});
}
@@ -253,29 +252,4 @@ fn mimic_members(members: &Slab<Member>) -> Slab<Member> {
Member::new(member.client_id).insert_into(&mut container);
}
container
-}
-
-fn reassign_partitions(
- shard_id: u16,
- partitions: Vec<partitions::ContainerId>,
-) -> impl FnOnce(ComponentsById<ConsumerGroupRefMut>) {
- move |(root, members)| {
- root.assign_partitions(partitions);
- let partitions = root.partitions();
- let id = root.id();
- reassign_partitions_to_members(shard_id, id, members, partitions);
- }
-}
-
-fn reassign_partitions_to_members(
- shard_id: u16,
- id: usize,
- members: &mut ConsumerGroupMembers,
- partitions: &Vec<usize>,
-) {
- members.inner_mut().rcu(move |members| {
- let mut members = mimic_members(members);
- assign_partitions_to_members(shard_id, id, &mut members,
partitions.clone());
- members
- });
-}
+}
\ No newline at end of file
diff --git a/core/server/src/streaming/topics/storage2.rs
b/core/server/src/streaming/topics/storage2.rs
index f355b74f..b166bc96 100644
--- a/core/server/src/streaming/topics/storage2.rs
+++ b/core/server/src/streaming/topics/storage2.rs
@@ -66,15 +66,14 @@ pub async fn delete_topic_from_disk(
let partitions = topic.root_mut().partitions_mut();
for id in ids {
let partition = partitions.delete(id);
- let (root, stats, _, _, _, _, mut log) = partition.into_components();
+ let (root, stats, _, _, _, _, _log) = partition.into_components();
let partition_id = root.id();
delete_partitions_from_disk(
shard_id,
stream_id,
topic_id,
partition_id,
- &mut log,
- &config,
+ config,
)
.await?;
messages_count += stats.messages_count_inconsistent();
diff --git a/core/server/src/streaming/topics/topic2.rs
b/core/server/src/streaming/topics/topic2.rs
index 930d96af..cfa9e593 100644
--- a/core/server/src/streaming/topics/topic2.rs
+++ b/core/server/src/streaming/topics/topic2.rs
@@ -3,7 +3,7 @@ use crate::slab::streams::Streams;
use crate::slab::topics;
use crate::slab::traits_ext::{EntityMarker, InsertCell, IntoComponents,
IntoComponentsById};
use crate::slab::{Keyed, consumer_groups::ConsumerGroups,
partitions::Partitions};
-use crate::streaming::stats::stats::{StreamStats, TopicStats};
+use crate::streaming::stats::{StreamStats, TopicStats};
use iggy_common::{CompressionAlgorithm, Identifier, IggyExpiry, IggyTimestamp,
MaxTopicSize};
use slab::Slab;
use std::cell::{Ref, RefMut};
@@ -326,6 +326,7 @@ impl TopicRoot {
}
// TODO: Move to separate module.
+#[allow(clippy::too_many_arguments)]
pub fn create_and_insert_topics_mem(
streams: &Streams,
stream_id: &Identifier,
diff --git a/core/server/src/streaming/utils/memory_pool.rs
b/core/server/src/streaming/utils/memory_pool.rs
index ed9cb4e9..d3fb62fe 100644
--- a/core/server/src/streaming/utils/memory_pool.rs
+++ b/core/server/src/streaming/utils/memory_pool.rs
@@ -240,10 +240,8 @@ impl MemoryPool {
);
}
- if was_pool_allocated {
- if let Some(orig_idx) = self.best_fit(original_capacity) {
- self.dec_bucket_in_use(orig_idx);
- }
+ if was_pool_allocated && let Some(orig_idx) =
self.best_fit(original_capacity) {
+ self.dec_bucket_in_use(orig_idx);
}
match self.best_fit(current_capacity) {
diff --git a/core/server/src/tcp/connection_handler.rs
b/core/server/src/tcp/connection_handler.rs
index 1427640a..7e1c112d 100644
--- a/core/server/src/tcp/connection_handler.rs
+++ b/core/server/src/tcp/connection_handler.rs
@@ -71,7 +71,7 @@ pub(crate) async fn handle_connection(
let length =
u32::from_le_bytes(initial_buffer[0..INITIAL_BYTES_LENGTH].try_into().unwrap());
let (res, mut code_buffer_out) = sender.read(code_buffer).await;
- let _ = res?;
+ res?;
let code: u32 =
u32::from_le_bytes(code_buffer_out[0..INITIAL_BYTES_LENGTH].try_into().unwrap());
initial_buffer.clear();
diff --git a/core/server/src/tcp/tcp_listener.rs
b/core/server/src/tcp/tcp_listener.rs
index 8fc8cbd3..5332ec19 100644
--- a/core/server/src/tcp/tcp_listener.rs
+++ b/core/server/src/tcp/tcp_listener.rs
@@ -89,7 +89,8 @@ pub async fn start(
format!("Failed to bind {server_name} server to address: {addr},
{err}")
})?;
let actual_addr = listener.local_addr().map_err(|e| {
- shard_error!(shard.id, "Failed to get local address: {e}");
+ // TODO(hubcio): macro doesn't work properly with syntax like {e}
+ shard_error!(shard.id, "Failed to get local address: {}", e);
IggyError::CannotBindToSocket(addr.to_string())
})?;
shard_info!(
diff --git a/core/server/src/tcp/tcp_server.rs
b/core/server/src/tcp/tcp_server.rs
index 180388f7..6051d518 100644
--- a/core/server/src/tcp/tcp_server.rs
+++ b/core/server/src/tcp/tcp_server.rs
@@ -34,7 +34,6 @@ pub async fn spawn_tcp_server(
} else {
"Iggy TCP"
};
- let ip_v6 = shard.config.tcp.ipv6;
let socket_config = &shard.config.tcp.socket;
let addr: SocketAddr = shard
.config
diff --git a/core/server/src/tcp/tcp_tls_listener.rs
b/core/server/src/tcp/tcp_tls_listener.rs
index c69326d2..376e6930 100644
--- a/core/server/src/tcp/tcp_tls_listener.rs
+++ b/core/server/src/tcp/tcp_tls_listener.rs
@@ -194,7 +194,6 @@ async fn accept_loop(
let acceptor = acceptor.clone();
// Perform TLS handshake in a separate task to avoid
blocking the accept loop
- let task_shard = shard_clone.clone();
let registry = shard.task_registry.clone();
let registry_clone = registry.clone();
registry.spawn_connection(async move {