This is an automated email from the ASF dual-hosted git repository. gkoszyk pushed a commit to branch rebase_master in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 110f8f417fb0649954314920df875fca4e4e3e78 Author: Hubert Gruszecki <[email protected]> AuthorDate: Tue Jun 24 18:55:03 2025 +0200 fix issues with RefCell in core/server/src/streaming/topics/consumer_groups* files --- core/server/src/streaming/topics/consumer_group.rs | 25 ++-- .../server/src/streaming/topics/consumer_groups.rs | 152 ++++++++++++--------- 2 files changed, 94 insertions(+), 83 deletions(-) diff --git a/core/server/src/streaming/topics/consumer_group.rs b/core/server/src/streaming/topics/consumer_group.rs index 43d9611b..cc28ef6f 100644 --- a/core/server/src/streaming/topics/consumer_group.rs +++ b/core/server/src/streaming/topics/consumer_group.rs @@ -18,7 +18,6 @@ use ahash::AHashMap; use iggy_common::IggyError; -use tokio::sync::RwLock; use tracing::trace; #[derive(Debug, Clone)] @@ -58,10 +57,13 @@ impl ConsumerGroup { self.assign_partitions().await; } - pub async fn calculate_partition_id(&self, member_id: u32) -> Result<Option<u32>, IggyError> { - let member = self.members.get(&member_id); + pub async fn calculate_partition_id( + &mut self, + member_id: u32, + ) -> Result<Option<u32>, IggyError> { + let member = self.members.get_mut(&member_id); if let Some(member) = member { - return Ok(member.await.calculate_partition_id()); + return Ok(member.calculate_partition_id()); } Err(IggyError::ConsumerGroupMemberNotFound( member_id, @@ -73,7 +75,7 @@ impl ConsumerGroup { pub async fn get_current_partition_id(&self, member_id: u32) -> Result<Option<u32>, IggyError> { let member = self.members.get(&member_id); if let Some(member) = member { - return Ok(member.read().await.current_partition_id); + return Ok(member.current_partition_id); } Err(IggyError::ConsumerGroupMemberNotFound( member_id, @@ -85,12 +87,12 @@ impl ConsumerGroup { pub async fn add_member(&mut self, member_id: u32) { self.members.insert( member_id, - RwLock::new(ConsumerGroupMember { + ConsumerGroupMember { id: member_id, partitions: AHashMap::new(), current_partition_index: None, current_partition_id: None, - }), + }, ); trace!( "Added member with ID: {} to consumer group: {} for topic with ID: {}", @@ -117,7 +119,6 @@ impl ConsumerGroup { let members_count = members.len() as u32; for member in members.iter_mut() { - let mut member = member.write().await; member.current_partition_index = None; member.current_partition_id = None; member.partitions.clear(); @@ -126,8 +127,7 @@ impl ConsumerGroup { for partition_index in 0..self.partitions_count { let partition_id = partition_index + 1; let member_index = partition_index % members_count; - let member = members.get(member_index as usize).unwrap(); - let mut member = member.write().await; + let member = members.get_mut(member_index as usize).unwrap(); let member_partition_index = member.partitions.len() as u32; member .partitions @@ -213,7 +213,6 @@ mod tests { consumer_group.add_member(member_id).await; let member = consumer_group.members.get(&member_id).unwrap(); - let member = member.read().await; assert_eq!( member.partitions.len() as u32, consumer_group.partitions_count @@ -240,8 +239,6 @@ mod tests { consumer_group.add_member(member2_id).await; let member1 = consumer_group.members.get(&member1_id).unwrap(); let member2 = consumer_group.members.get(&member2_id).unwrap(); - let member1 = member1.read().await; - let member2 = member2.read().await; assert_eq!( member1.partitions.len() + member2.partitions.len(), consumer_group.partitions_count as usize @@ -277,8 +274,6 @@ mod tests { consumer_group.add_member(member2_id).await; let member1 = consumer_group.members.get(&member1_id).unwrap(); let member2 = consumer_group.members.get(&member2_id).unwrap(); - let member1 = member1.read().await; - let member2 = member2.read().await; if member1.partitions.len() == 1 { assert_eq!(member2.partitions.len(), 0); } else { diff --git a/core/server/src/streaming/topics/consumer_groups.rs b/core/server/src/streaming/topics/consumer_groups.rs index 26505355..1f50a1bb 100644 --- a/core/server/src/streaming/topics/consumer_groups.rs +++ b/core/server/src/streaming/topics/consumer_groups.rs @@ -21,37 +21,31 @@ use crate::streaming::topics::consumer_group::ConsumerGroup; use crate::streaming::topics::topic::Topic; use error_set::ErrContext; use iggy_common::IggyError; -use iggy_common::locking::IggySharedMutFn; use iggy_common::{IdKind, Identifier}; use std::sync::atomic::Ordering; -use tokio::sync::RwLock; use tracing::info; impl Topic { - pub async fn reassign_consumer_groups(&mut self) { + pub fn reassign_consumer_groups(&mut self) { if self.consumer_groups.is_empty() { return; } - let partitions_count = self.partitions.len() as u32; + let partitions_count = self.partitions.borrow().len() as u32; info!( "Reassigning consumer groups for topic with ID: {} for stream with ID with {}, partitions count: {}", self.topic_id, self.stream_id, partitions_count ); for (_, consumer_group) in self.consumer_groups.iter_mut() { - let mut consumer_group = consumer_group.write().await; - consumer_group.reassign_partitions(partitions_count).await; + consumer_group.reassign_partitions(partitions_count); } } - pub fn get_consumer_groups(&self) -> Vec<&RwLock<ConsumerGroup>> { + pub fn get_consumer_groups(&self) -> Vec<&ConsumerGroup> { self.consumer_groups.values().collect() } - pub fn get_consumer_group( - &self, - identifier: &Identifier, - ) -> Result<&RwLock<ConsumerGroup>, IggyError> { + pub fn get_consumer_group(&self, identifier: &Identifier) -> Result<&ConsumerGroup, IggyError> { match identifier.kind { IdKind::Numeric => self.get_consumer_group_by_id(identifier.get_u32_value().unwrap()), IdKind::String => self.get_consumer_group_by_name(&identifier.get_cow_str_value()?), @@ -61,7 +55,7 @@ impl Topic { pub fn try_get_consumer_group( &self, identifier: &Identifier, - ) -> Result<Option<&RwLock<ConsumerGroup>>, IggyError> { + ) -> Result<Option<&ConsumerGroup>, IggyError> { match identifier.kind { IdKind::Numeric => Ok(self.consumer_groups.get(&identifier.get_u32_value()?)), IdKind::String => { @@ -70,16 +64,13 @@ impl Topic { } } - fn try_get_consumer_group_by_name(&self, name: &str) -> Option<&RwLock<ConsumerGroup>> { + fn try_get_consumer_group_by_name(&self, name: &str) -> Option<&ConsumerGroup> { self.consumer_groups_ids .get(name) .and_then(|id| self.consumer_groups.get(id)) } - pub fn get_consumer_group_by_name( - &self, - name: &str, - ) -> Result<&RwLock<ConsumerGroup>, IggyError> { + pub fn get_consumer_group_by_name(&self, name: &str) -> Result<&ConsumerGroup, IggyError> { let group_id = self.consumer_groups_ids.get(name); if group_id.is_none() { return Err(IggyError::ConsumerGroupNameNotFound( @@ -91,7 +82,7 @@ impl Topic { self.get_consumer_group_by_id(*group_id.unwrap()) } - pub fn get_consumer_group_by_id(&self, id: u32) -> Result<ConsumerGroup, IggyError> { + pub fn get_consumer_group_by_id(&self, id: u32) -> Result<&ConsumerGroup, IggyError> { let consumer_group = self.consumer_groups.get(&id); if consumer_group.is_none() { return Err(IggyError::ConsumerGroupIdNotFound(id, self.topic_id)); @@ -100,7 +91,46 @@ impl Topic { Ok(consumer_group.unwrap()) } - pub async fn create_consumer_group( + pub fn get_consumer_group_mut( + &mut self, + identifier: &Identifier, + ) -> Result<&mut ConsumerGroup, IggyError> { + match identifier.kind { + IdKind::Numeric => { + self.get_consumer_group_by_id_mut(identifier.get_u32_value().unwrap()) + } + IdKind::String => self.get_consumer_group_by_name_mut(&identifier.get_cow_str_value()?), + } + } + + pub fn get_consumer_group_by_id_mut( + &mut self, + id: u32, + ) -> Result<&mut ConsumerGroup, IggyError> { + let consumer_group = self.consumer_groups.get_mut(&id); + if consumer_group.is_none() { + return Err(IggyError::ConsumerGroupIdNotFound(id, self.topic_id)); + } + + Ok(consumer_group.unwrap()) + } + + pub fn get_consumer_group_by_name_mut( + &mut self, + name: &str, + ) -> Result<&mut ConsumerGroup, IggyError> { + let group_id = self.consumer_groups_ids.get(name).copied(); + if group_id.is_none() { + return Err(IggyError::ConsumerGroupNameNotFound( + name.to_string(), + self.name.to_owned(), + )); + } + + self.get_consumer_group_by_id_mut(group_id.unwrap()) + } + + pub fn create_consumer_group( &mut self, group_id: Option<u32>, name: &str, @@ -137,15 +167,20 @@ impl Topic { return Err(IggyError::ConsumerGroupIdAlreadyExists(id, self.topic_id)); } - let consumer_group = - ConsumerGroup::new(self.topic_id, id, name, self.partitions.borrow().len() as u32); - self.consumer_groups.insert(id, consumer_group.clone()); + let consumer_group = ConsumerGroup::new( + self.topic_id, + id, + name, + self.partitions.borrow().len() as u32, + ); self.consumer_groups_ids.insert(name.to_owned(), id); + let cloned_group = consumer_group.clone(); + self.consumer_groups.insert(id, consumer_group); info!( "Created consumer group with ID: {} for topic with ID: {} and stream with ID: {}.", id, self.topic_id, self.stream_id ); - consumer_group + Ok(cloned_group) } pub async fn delete_consumer_group( @@ -157,7 +192,6 @@ impl Topic { let consumer_group = self.get_consumer_group(id).with_error_context(|error| { format!("{COMPONENT} (error: {error}) - failed to get consumer group with id: {id}") })?; - let consumer_group = consumer_group.read().await; group_id = consumer_group.group_id; } @@ -167,7 +201,6 @@ impl Topic { } let consumer_group = consumer_group.unwrap(); { - let group_id = consumer_group.group_id; self.consumer_groups_ids.remove(&consumer_group.name); let current_group_id = self.current_consumer_group_id.load(Ordering::SeqCst); if current_group_id > group_id { @@ -193,16 +226,15 @@ impl Topic { Ok(consumer_group) } - pub async fn join_consumer_group( - &self, + pub fn join_consumer_group( + &mut self, group_id: &Identifier, member_id: u32, ) -> Result<(), IggyError> { - let consumer_group = self.get_consumer_group(group_id).with_error_context(|error| { + let consumer_group = self.get_consumer_group_mut(group_id).with_error_context(|error| { format!("{COMPONENT} (error: {error}) - failed to get consumer group with id: {group_id}") })?; - let mut consumer_group = consumer_group.write().await; - consumer_group.add_member(member_id).await; + consumer_group.add_member(member_id); info!( "Member with ID: {} has joined consumer group with ID: {} for topic with ID: {} and stream with ID: {}.", member_id, group_id, self.topic_id, self.stream_id @@ -210,16 +242,15 @@ impl Topic { Ok(()) } - pub async fn leave_consumer_group( - &self, + pub fn leave_consumer_group( + &mut self, group_id: &Identifier, member_id: u32, ) -> Result<(), IggyError> { - let consumer_group = self.get_consumer_group(group_id).with_error_context(|error| { + let consumer_group = self.get_consumer_group_mut(group_id).with_error_context(|error| { format!("{COMPONENT} (error: {error}) - failed to get consumer group with id: {group_id}") })?; - let mut consumer_group = consumer_group.write().await; - consumer_group.delete_member(member_id).await; + consumer_group.delete_member(member_id); info!( "Member with ID: {} has left consumer group with ID: {} for topic with ID: {} and stream with ID: {}.", member_id, group_id, self.topic_id, self.stream_id @@ -236,6 +267,7 @@ mod tests { use crate::streaming::storage::SystemStorage; use crate::streaming::utils::MemoryPool; use iggy_common::{CompressionAlgorithm, IggyExpiry, MaxTopicSize}; + use std::rc::Rc; use std::sync::Arc; use std::sync::atomic::{AtomicU32, AtomicU64}; @@ -245,10 +277,10 @@ mod tests { let name = "test"; let mut topic = get_topic().await; let topic_id = topic.topic_id; - let result = topic.create_consumer_group(Some(group_id), name).await; + let result = topic.create_consumer_group(Some(group_id), name); assert!(result.is_ok()); { - let created_consumer_group = result.unwrap().read().await; + let created_consumer_group = result.unwrap(); assert_eq!(created_consumer_group.group_id, group_id); assert_eq!(created_consumer_group.name, name); assert_eq!(created_consumer_group.topic_id, topic_id); @@ -258,13 +290,12 @@ mod tests { let consumer_group = topic .get_consumer_group(&Identifier::numeric(group_id).unwrap()) .unwrap(); - let consumer_group = consumer_group.read().await; assert_eq!(consumer_group.group_id, group_id); assert_eq!(consumer_group.name, name); assert_eq!(consumer_group.topic_id, topic_id); assert_eq!( consumer_group.partitions_count, - topic.partitions.len() as u32 + topic.partitions.borrow().len() as u32 ); } @@ -273,10 +304,10 @@ mod tests { let group_id = 1; let name = "test"; let mut topic = get_topic().await; - let result = topic.create_consumer_group(Some(group_id), name).await; + let result = topic.create_consumer_group(Some(group_id), name); assert!(result.is_ok()); assert_eq!(topic.consumer_groups.len(), 1); - let result = topic.create_consumer_group(Some(group_id), "test2").await; + let result = topic.create_consumer_group(Some(group_id), "test2"); assert!(result.is_err()); let err = result.unwrap_err(); assert!(matches!(err, IggyError::ConsumerGroupIdAlreadyExists(_, _))); @@ -288,11 +319,11 @@ mod tests { let group_id = 1; let name = "test"; let mut topic = get_topic().await; - let result = topic.create_consumer_group(Some(group_id), name).await; + let result = topic.create_consumer_group(Some(group_id), name); assert!(result.is_ok()); assert_eq!(topic.consumer_groups.len(), 1); let group_id = group_id + 1; - let result = topic.create_consumer_group(Some(group_id), name).await; + let result = topic.create_consumer_group(Some(group_id), name); assert!(result.is_err()); let err = result.unwrap_err(); assert!(matches!( @@ -307,7 +338,7 @@ mod tests { let group_id = 1; let name = "test"; let mut topic = get_topic().await; - let result = topic.create_consumer_group(Some(group_id), name).await; + let result = topic.create_consumer_group(Some(group_id), name); assert!(result.is_ok()); assert_eq!(topic.consumer_groups.len(), 1); let result = topic @@ -322,7 +353,7 @@ mod tests { let group_id = 1; let name = "test"; let mut topic = get_topic().await; - let result = topic.create_consumer_group(Some(group_id), name).await; + let result = topic.create_consumer_group(Some(group_id), name); assert!(result.is_ok()); assert_eq!(topic.consumer_groups.len(), 1); let group_id = group_id + 1; @@ -339,19 +370,12 @@ mod tests { let name = "test"; let member_id = 1; let mut topic = get_topic().await; - topic - .create_consumer_group(Some(group_id), name) - .await - .unwrap(); - let result = topic - .join_consumer_group(&Identifier::numeric(group_id).unwrap(), member_id) - .await; + topic.create_consumer_group(Some(group_id), name).unwrap(); + let result = topic.join_consumer_group(&Identifier::numeric(group_id).unwrap(), member_id); assert!(result.is_ok()); let consumer_group = topic .get_consumer_group(&Identifier::numeric(group_id).unwrap()) - .unwrap() - .read() - .await; + .unwrap(); let members = consumer_group.get_members(); assert_eq!(members.len(), 1); } @@ -362,34 +386,26 @@ mod tests { let name = "test"; let member_id = 1; let mut topic = get_topic().await; - topic - .create_consumer_group(Some(group_id), name) - .await - .unwrap(); + topic.create_consumer_group(Some(group_id), name).unwrap(); topic .join_consumer_group(&Identifier::numeric(group_id).unwrap(), member_id) - .await .unwrap(); - let result = topic - .leave_consumer_group(&Identifier::numeric(group_id).unwrap(), member_id) - .await; + let result = topic.leave_consumer_group(&Identifier::numeric(group_id).unwrap(), member_id); assert!(result.is_ok()); let consumer_group = topic .get_consumer_group(&Identifier::numeric(group_id).unwrap()) - .unwrap() - .read() - .await; + .unwrap(); let members = consumer_group.get_members(); assert!(members.is_empty()) } async fn get_topic() -> Topic { let tempdir = tempfile::TempDir::new().unwrap(); - let config = Arc::new(SystemConfig { + let config = Rc::new(SystemConfig { path: tempdir.path().to_str().unwrap().to_string(), ..Default::default() }); - let storage = Arc::new(SystemStorage::new( + let storage = Rc::new(SystemStorage::new( config.clone(), Arc::new(PersisterKind::FileWithSync(FileWithSyncPersister {})), ));
