This is an automated email from the ASF dual-hosted git repository. gkoszyk pushed a commit to branch message_bus_final in repository https://gitbox.apache.org/repos/asf/iggy.git
commit c0e69adbf21ddac9ab2f7fe4845766945f11e9b0 Author: numminex <[email protected]> AuthorDate: Thu Dec 4 20:24:02 2025 +0100 temp --- Cargo.lock | 4 + core/common/src/types/alloc/buffer.rs | 0 core/common/src/types/alloc/memory_pool.rs | 0 core/common/src/types/alloc/mod.rs | 0 core/common/src/types/sender/mod.rs | 0 core/common/src/types/sender/quic_sender.rs | 0 core/common/src/types/sender/tcp_sender.rs | 0 core/common/src/types/sender/tcp_tls_sender.rs | 0 core/common/src/types/sender/websocket_sender.rs | 0 .../src/types/sender/websocket_tls_sender.rs | 0 core/message_bus/Cargo.toml | 2 + core/message_bus/src/lib.rs | 339 ++++++++++++++++++++- 12 files changed, 342 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 59712c563..5946717b0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5784,6 +5784,10 @@ checksum = "f52b00d39961fc5b2736ea853c9cc86238e165017a493d1d5c8eac6bdc4cc273" [[package]] name = "message_bus" version = "0.1.0" +dependencies = [ + "iggy_common", + "rand 0.9.2", +] [[package]] name = "metadata" diff --git a/core/common/src/types/alloc/buffer.rs b/core/common/src/types/alloc/buffer.rs new file mode 100644 index 000000000..e69de29bb diff --git a/core/common/src/types/alloc/memory_pool.rs b/core/common/src/types/alloc/memory_pool.rs new file mode 100644 index 000000000..e69de29bb diff --git a/core/common/src/types/alloc/mod.rs b/core/common/src/types/alloc/mod.rs new file mode 100644 index 000000000..e69de29bb diff --git a/core/common/src/types/sender/mod.rs b/core/common/src/types/sender/mod.rs new file mode 100644 index 000000000..e69de29bb diff --git a/core/common/src/types/sender/quic_sender.rs b/core/common/src/types/sender/quic_sender.rs new file mode 100644 index 000000000..e69de29bb diff --git a/core/common/src/types/sender/tcp_sender.rs b/core/common/src/types/sender/tcp_sender.rs new file mode 100644 index 000000000..e69de29bb diff --git a/core/common/src/types/sender/tcp_tls_sender.rs b/core/common/src/types/sender/tcp_tls_sender.rs new file mode 100644 index 000000000..e69de29bb diff --git a/core/common/src/types/sender/websocket_sender.rs b/core/common/src/types/sender/websocket_sender.rs new file mode 100644 index 000000000..e69de29bb diff --git a/core/common/src/types/sender/websocket_tls_sender.rs b/core/common/src/types/sender/websocket_tls_sender.rs new file mode 100644 index 000000000..e69de29bb diff --git a/core/message_bus/Cargo.toml b/core/message_bus/Cargo.toml index 713b8fec9..4286a3a8e 100644 --- a/core/message_bus/Cargo.toml +++ b/core/message_bus/Cargo.toml @@ -28,5 +28,7 @@ repository = "https://github.com/apache/iggy" readme = "../../../README.md" [dependencies] +iggy_common = { workspace = true } +rand = { workspace = true } diff --git a/core/message_bus/src/lib.rs b/core/message_bus/src/lib.rs index 974061d6c..7a60c7cb8 100644 --- a/core/message_bus/src/lib.rs +++ b/core/message_bus/src/lib.rs @@ -14,9 +14,342 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. +use iggy_common::{SenderKind, TcpSender}; +use rand::{SeedableRng, rngs::StdRng, seq::SliceRandom}; +use std::{ + cell::RefCell, + collections::{HashMap, HashSet}, rc::Rc, +}; -pub trait MessageBus {} +const MAX_CONNECTIONS_PER_REPLICA: usize = 8; -pub struct IggyMessageBus {} +/// Trait for changesets - the Cache type is a GAT defined by the changeset +pub trait ChangeSet { + type Cache; + + fn apply(self, cache: &mut Self::Cache); +} -impl MessageBus for IggyMessageBus {} +/// Allocation strategy that produces changesets +pub trait AllocationStrategy { + /// The changeset type - defines its own cache type via GAT + type ChangeSet: ChangeSet; + /// The resource identifier type (e.g., replica id) + type Resource; + + fn allocate(&self, resource: Self::Resource) -> Option<Self::ChangeSet>; + fn deallocate(&self, resource: Self::Resource) -> Option<Self::ChangeSet>; +} + +/// Coordinator that ties a strategy to sharded caches +/// Cache type is derived from the strategy's changeset +pub struct Coordinator<S: AllocationStrategy> { + strategy: S, +} + +impl<S: AllocationStrategy> Coordinator<S> { + pub fn new(strategy: S) -> Self { + Self { strategy } + } + + pub fn allocate(&self, resource: S::Resource) -> Option<S::ChangeSet> { + self.strategy.allocate(resource) + } + + pub fn deallocate(&self, resource: S::Resource) -> Option<S::ChangeSet> { + self.strategy.deallocate(resource) + } +} + +/// Binds a coordinator with its matching cache +pub struct ShardedAllocation<S: AllocationStrategy> { + pub coordinator: Coordinator<S>, + pub cache: <S::ChangeSet as ChangeSet>::Cache, +} + +impl<S: AllocationStrategy> ShardedAllocation<S> { + pub fn new( + coordinator: Coordinator<S>, + cache: <S::ChangeSet as ChangeSet>::Cache, + ) -> Self { + Self { coordinator, cache } + } + + pub fn allocate(&mut self, resource: S::Resource) -> bool { + if let Some(changes) = self.coordinator.allocate(resource) { + changes.apply(&mut self.cache); + true + } else { + false + } + } + + pub fn deallocate(&mut self, resource: S::Resource) -> bool { + if let Some(changes) = self.coordinator.deallocate(resource) { + changes.apply(&mut self.cache); + true + } else { + false + } + } +} + +/// Message bus parameterized by allocation strategy +pub trait MessageBus { + type Strategy: AllocationStrategy; + type ClientId; + + fn send_to_client(&self, client_id: Self::ClientId, data: Vec<u8>) -> Result<(), String>; +} + +// ============================================================================ +// Concrete Implementation: Connection-based allocation +// ============================================================================ + +/// Identifies a connection on a specific shard +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct ConnectionAssignment { + pub replica: u8, + pub shard: u16, +} + +/// Maps a source shard to the shard that owns the connection +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct ShardAssignment { + pub replica: u8, + pub shard: u16, + pub conn_shard: u16, +} + +/// Cache for connection state per shard +pub struct ConnectionCache { + shard_id: u16, + connections: HashMap<u8, Option<Rc<TcpSender>>>, + connection_map: HashMap<u8, u16>, +} + +impl ConnectionCache { + pub fn new(shard_id: u16) -> Self { + Self { + shard_id, + connections: HashMap::new(), + connection_map: HashMap::new(), + } + } + + pub fn get_connection(&self, replica: u8) -> Option<Rc<TcpSender>> { + self.connections.get(&replica).and_then(|opt| opt.clone()) + } + + pub fn get_mapped_shard(&self, replica: u8) -> Option<u16> { + self.connection_map.get(&replica).copied() + } +} + +/// Changeset for connection-based allocation +#[derive(Debug, Clone)] +pub enum ConnectionChanges { + Allocate { + connections: Vec<ConnectionAssignment>, + mappings: Vec<ShardAssignment>, + }, + Deallocate { + connections: Vec<ConnectionAssignment>, + mappings: Vec<ConnectionAssignment>, + }, +} + +impl ChangeSet for ConnectionChanges { + type Cache = ConnectionCache; + + fn apply(self, cache: &mut Self::Cache) { + let shard_id = cache.shard_id; + match self { + ConnectionChanges::Allocate { connections, mappings } => { + for conn in connections.iter().filter(|c| c.shard == shard_id) { + cache.connections.insert(conn.replica, None); + } + for mapping in mappings.iter().filter(|m| m.shard == shard_id) { + cache.connection_map.insert(mapping.replica, mapping.conn_shard); + } + } + ConnectionChanges::Deallocate { connections, mappings } => { + for conn in connections.iter().filter(|c| c.shard == shard_id) { + cache.connections.remove(&conn.replica); + } + for mapping in mappings.iter().filter(|m| m.shard == shard_id) { + cache.connection_map.remove(&mapping.replica); + } + } + } + } +} + +/// Least-loaded allocation strategy for connections +pub struct LeastLoadedStrategy { + total_shards: usize, + connections_per_shard: RefCell<Vec<(u16, usize)>>, + replica_to_shards: RefCell<HashMap<u8, HashSet<u16>>>, + rng_seed: u64, +} + +impl LeastLoadedStrategy { + pub fn new(total_shards: usize, seed: u64) -> Self { + Self { + total_shards, + connections_per_shard: RefCell::new((0..total_shards).map(|s| (s as u16, 0)).collect()), + replica_to_shards: RefCell::new(HashMap::new()), + rng_seed: seed, + } + } + + fn create_shard_mappings( + &self, + mappings: &mut Vec<ShardAssignment>, + replica: u8, + mut conn_shards: Vec<u16>, + ) { + for shard in &conn_shards { + mappings.push(ShardAssignment { + replica, + shard: *shard, + conn_shard: *shard, + }); + } + + let mut rng = StdRng::seed_from_u64(self.rng_seed); + conn_shards.shuffle(&mut rng); + + let mut j = 0; + for shard in 0..self.total_shards { + let shard = shard as u16; + if conn_shards.contains(&shard) { + continue; + } + let conn_idx = j % conn_shards.len(); + mappings.push(ShardAssignment { + replica, + shard, + conn_shard: conn_shards[conn_idx], + }); + j += 1; + } + } +} + +impl AllocationStrategy for LeastLoadedStrategy { + type ChangeSet = ConnectionChanges; + type Resource = u8; + + fn allocate(&self, replica: Self::Resource) -> Option<Self::ChangeSet> { + if self.replica_to_shards.borrow().contains_key(&replica) { + return None; + } + + let mut connections = Vec::new(); + let mut mappings = Vec::new(); + let connections_needed = self.total_shards.min(MAX_CONNECTIONS_PER_REPLICA); + + let mut rng = StdRng::seed_from_u64(self.rng_seed); + self.connections_per_shard.borrow_mut().shuffle(&mut rng); + self.connections_per_shard + .borrow_mut() + .sort_by_key(|(_, count)| *count); + + let mut assigned_shards = HashSet::with_capacity(connections_needed); + + for i in 0..connections_needed { + let mut connections_per_shard = self.connections_per_shard.borrow_mut(); + let (shard, count) = connections_per_shard.get_mut(i).unwrap(); + connections.push(ConnectionAssignment { + replica, + shard: *shard, + }); + *count += 1; + assigned_shards.insert(*shard); + } + + self.replica_to_shards + .borrow_mut() + .insert(replica, assigned_shards.clone()); + + self.create_shard_mappings(&mut mappings, replica, assigned_shards.into_iter().collect()); + + Some(ConnectionChanges::Allocate { connections, mappings }) + } + + fn deallocate(&self, replica: Self::Resource) -> Option<Self::ChangeSet> { + let conn_shards = self.replica_to_shards.borrow_mut().remove(&replica)?; + + let mut connections = Vec::new(); + let mut mappings = Vec::new(); + + for shard in &conn_shards { + if let Some((_, count)) = self + .connections_per_shard + .borrow_mut() + .iter_mut() + .find(|(s, _)| s == shard) + { + *count = count.saturating_sub(1); + } + connections.push(ConnectionAssignment { + replica, + shard: *shard, + }); + } + + for shard in 0..self.total_shards { + let shard = shard as u16; + mappings.push(ConnectionAssignment { replica, shard }); + } + + Some(ConnectionChanges::Deallocate { connections, mappings }) + } +} + +pub struct IggyMessageBus { + clients: HashMap<u128, SenderKind>, + replicas: ShardedAllocation<LeastLoadedStrategy>, + shard_id: u16, +} + +impl IggyMessageBus { + pub fn new(total_shards: usize, shard_id: u16, seed: u64) -> Self { + Self { + clients: HashMap::new(), + replicas: ShardedAllocation::new( + Coordinator::new(LeastLoadedStrategy::new(total_shards, seed)), + ConnectionCache::new(shard_id), + ), + shard_id, + } + } + + pub fn add_replica(&mut self, replica: u8) -> bool { + self.replicas.allocate(replica) + } + + pub fn remove_replica(&mut self, replica: u8) -> bool { + self.replicas.deallocate(replica) + } + + pub fn get_replica_connection(&self, replica: u8) -> Option<Rc<TcpSender>> { + let mapped_shard = self.replicas.cache.get_mapped_shard(replica)?; + if mapped_shard == self.shard_id { + self.replicas.cache.get_connection(replica) + } else { + None + } + } +} + +impl MessageBus for IggyMessageBus { + type Strategy = LeastLoadedStrategy; + type ClientId = u128; + + fn send_to_client(&self, _client_id: Self::ClientId, _data: Vec<u8>) -> Result<(), String> { + // TODO: Implementation + Ok(()) + } +}
