This is an automated email from the ASF dual-hosted git repository.
piotr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new b64a1333c feat(message_bus): create message_bus interface and basic
impl for IggyMessageBus (#2446)
b64a1333c is described below
commit b64a1333cf8b1e70f2495207f25e5ededdbd0028
Author: Grzegorz Koszyk <[email protected]>
AuthorDate: Sat Dec 6 14:23:23 2025 +0100
feat(message_bus): create message_bus interface and basic impl for
IggyMessageBus (#2446)
---
Cargo.lock | 4 +
core/message_bus/Cargo.toml | 2 +
core/message_bus/src/cache/connection.rs | 311 ++++++++++++++++++++++++++
core/message_bus/src/{lib.rs => cache/mod.rs} | 14 +-
core/message_bus/src/lib.rs | 114 +++++++++-
5 files changed, 439 insertions(+), 6 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
index d6ee2f530..cc9d1f831 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -5786,6 +5786,10 @@ checksum =
"f52b00d39961fc5b2736ea853c9cc86238e165017a493d1d5c8eac6bdc4cc273"
[[package]]
name = "message_bus"
version = "0.1.0"
+dependencies = [
+ "iggy_common",
+ "rand 0.9.2",
+]
[[package]]
name = "metadata"
diff --git a/core/message_bus/Cargo.toml b/core/message_bus/Cargo.toml
index 713b8fec9..4286a3a8e 100644
--- a/core/message_bus/Cargo.toml
+++ b/core/message_bus/Cargo.toml
@@ -28,5 +28,7 @@ repository = "https://github.com/apache/iggy"
readme = "../../../README.md"
[dependencies]
+iggy_common = { workspace = true }
+rand = { workspace = true }
diff --git a/core/message_bus/src/cache/connection.rs
b/core/message_bus/src/cache/connection.rs
new file mode 100644
index 000000000..d07ac66ec
--- /dev/null
+++ b/core/message_bus/src/cache/connection.rs
@@ -0,0 +1,311 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+use crate::cache::AllocationStrategy;
+use iggy_common::TcpSender;
+use rand::{SeedableRng, rngs::StdRng, seq::SliceRandom};
+use std::{
+ cell::RefCell,
+ collections::{HashMap, HashSet},
+ rc::Rc,
+};
+
+const MAX_CONNECTIONS_PER_REPLICA: usize = 8;
+
+// TODO: Move to some common trait location.
+pub trait ShardedState {
+ type Entry;
+ type Delta;
+
+ fn apply(&mut self, delta: Self::Delta);
+}
+
+/// Least-loaded allocation strategy for connections
+pub struct LeastLoadedStrategy {
+ total_shards: usize,
+ connections_per_shard: RefCell<Vec<(u16, usize)>>,
+ replica_to_shards: RefCell<HashMap<u8, HashSet<u16>>>,
+ rng_seed: u64,
+}
+
+impl LeastLoadedStrategy {
+ pub fn new(total_shards: usize, seed: u64) -> Self {
+ Self {
+ total_shards,
+ connections_per_shard: RefCell::new((0..total_shards).map(|s| (s
as u16, 0)).collect()),
+ replica_to_shards: RefCell::new(HashMap::new()),
+ rng_seed: seed,
+ }
+ }
+
+ fn create_shard_mappings(
+ &self,
+ mappings: &mut Vec<ShardAssignment>,
+ replica: u8,
+ mut conn_shards: Vec<u16>,
+ ) {
+ for shard in &conn_shards {
+ mappings.push(ShardAssignment {
+ replica,
+ shard: *shard,
+ conn_shard: *shard,
+ });
+ }
+
+ let mut rng = StdRng::seed_from_u64(self.rng_seed);
+ conn_shards.shuffle(&mut rng);
+
+ let mut j = 0;
+ for shard in 0..self.total_shards {
+ let shard = shard as u16;
+ if conn_shards.contains(&shard) {
+ continue;
+ }
+ let conn_idx = j % conn_shards.len();
+ mappings.push(ShardAssignment {
+ replica,
+ shard,
+ conn_shard: conn_shards[conn_idx],
+ });
+ j += 1;
+ }
+ }
+}
+
+/// Identifies a connection on a specific shard
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub struct ConnectionAssignment {
+ replica: u8,
+ shard: u16,
+}
+
+/// Maps a source shard to the shard that owns the connection
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub struct ShardAssignment {
+ replica: u8,
+ shard: u16,
+ conn_shard: u16,
+}
+
+/// Changeset for connection-based allocation
+#[derive(Debug, Clone)]
+pub enum ConnectionChanges {
+ Allocate {
+ connections: Vec<ConnectionAssignment>,
+ mappings: Vec<ShardAssignment>,
+ },
+ Deallocate {
+ connections: Vec<ConnectionAssignment>,
+ mappings: Vec<ConnectionAssignment>,
+ },
+}
+
+type Delta = <ConnectionCache as ShardedState>::Delta;
+impl AllocationStrategy<ConnectionCache> for LeastLoadedStrategy {
+ fn allocate(&self, replica: u8) -> Option<Delta> {
+ if self.replica_to_shards.borrow().contains_key(&replica) {
+ return None;
+ }
+
+ let mut connections = Vec::new();
+ let mut mappings = Vec::new();
+ let connections_needed =
self.total_shards.min(MAX_CONNECTIONS_PER_REPLICA);
+
+ let mut rng = StdRng::seed_from_u64(self.rng_seed);
+ self.connections_per_shard.borrow_mut().shuffle(&mut rng);
+ self.connections_per_shard
+ .borrow_mut()
+ .sort_by_key(|(_, count)| *count);
+
+ let mut assigned_shards = HashSet::with_capacity(connections_needed);
+
+ for i in 0..connections_needed {
+ let mut connections_per_shard =
self.connections_per_shard.borrow_mut();
+ let (shard, count) = connections_per_shard.get_mut(i).unwrap();
+ connections.push(ConnectionAssignment {
+ replica,
+ shard: *shard,
+ });
+ *count += 1;
+ assigned_shards.insert(*shard);
+ }
+
+ self.replica_to_shards
+ .borrow_mut()
+ .insert(replica, assigned_shards.clone());
+
+ self.create_shard_mappings(
+ &mut mappings,
+ replica,
+ assigned_shards.into_iter().collect(),
+ );
+
+ Some(Delta::Allocate {
+ connections,
+ mappings,
+ })
+ }
+
+ fn deallocate(&self, replica: u8) -> Option<Delta> {
+ let conn_shards =
self.replica_to_shards.borrow_mut().remove(&replica)?;
+
+ let mut connections = Vec::new();
+ let mut mappings = Vec::new();
+
+ for shard in &conn_shards {
+ if let Some((_, count)) = self
+ .connections_per_shard
+ .borrow_mut()
+ .iter_mut()
+ .find(|(s, _)| s == shard)
+ {
+ *count = count.saturating_sub(1);
+ }
+ connections.push(ConnectionAssignment {
+ replica,
+ shard: *shard,
+ });
+ }
+
+ for shard in 0..self.total_shards {
+ let shard = shard as u16;
+ mappings.push(ConnectionAssignment { replica, shard });
+ }
+
+ Some(Delta::Deallocate {
+ connections,
+ mappings,
+ })
+ }
+}
+
+/// Coordinator that wraps a strategy for a specific sharded state type
+pub struct Coordinator<A, SS>
+where
+ SS: ShardedState,
+ A: AllocationStrategy<SS>,
+{
+ strategy: A,
+ _ss: std::marker::PhantomData<SS>,
+}
+
+impl<A, SS> Coordinator<A, SS>
+where
+ SS: ShardedState,
+ A: AllocationStrategy<SS>,
+{
+ pub fn new(strategy: A) -> Self {
+ Self {
+ strategy,
+ _ss: std::marker::PhantomData,
+ }
+ }
+
+ pub fn allocate(&self, entry: SS::Entry) -> Option<SS::Delta> {
+ self.strategy.allocate(entry)
+ }
+
+ pub fn deallocate(&self, entry: SS::Entry) -> Option<SS::Delta> {
+ self.strategy.deallocate(entry)
+ }
+}
+
+pub struct ShardedConnections<A, SS>
+where
+ SS: ShardedState,
+ A: AllocationStrategy<SS>,
+{
+ pub coordinator: Coordinator<A, SS>,
+ pub state: SS,
+}
+
+impl<A, SS> ShardedConnections<A, SS>
+where
+ SS: ShardedState,
+ A: AllocationStrategy<SS>,
+{
+ pub fn allocate(&mut self, entry: SS::Entry) -> bool {
+ if let Some(delta) = self.coordinator.allocate(entry) {
+ // TODO: broadcast to other shards.
+ self.state.apply(delta);
+ true
+ } else {
+ false
+ }
+ }
+
+ pub fn deallocate(&mut self, entry: SS::Entry) -> bool {
+ if let Some(delta) = self.coordinator.deallocate(entry) {
+ // TODO: broadcast to other shards.
+ self.state.apply(delta);
+ true
+ } else {
+ false
+ }
+ }
+}
+
+/// Cache for connection state per shard
+#[derive(Default)]
+pub struct ConnectionCache {
+ pub shard_id: u16,
+ pub connections: HashMap<u8, Option<Rc<TcpSender>>>,
+ pub connection_map: HashMap<u8, u16>,
+}
+
+impl ConnectionCache {
+ pub fn get_connection(&self, replica: u8) -> Option<Rc<TcpSender>> {
+ self.connections.get(&replica).and_then(|opt| opt.clone())
+ }
+
+ pub fn get_mapped_shard(&self, replica: u8) -> Option<u16> {
+ self.connection_map.get(&replica).copied()
+ }
+}
+
+impl ShardedState for ConnectionCache {
+ type Entry = u8; // replica id
+ type Delta = ConnectionChanges;
+
+ fn apply(&mut self, delta: Self::Delta) {
+ let shard_id = self.shard_id;
+ match delta {
+ ConnectionChanges::Allocate {
+ connections,
+ mappings,
+ } => {
+ for conn in connections.iter().filter(|c| c.shard == shard_id)
{
+ self.connections.insert(conn.replica, None);
+ }
+ for mapping in &mappings {
+ self.connection_map
+ .insert(mapping.replica, mapping.conn_shard);
+ }
+ }
+ ConnectionChanges::Deallocate {
+ connections,
+ mappings,
+ } => {
+ for conn in connections.iter().filter(|c| c.shard == shard_id)
{
+ self.connections.remove(&conn.replica);
+ }
+ for mapping in &mappings {
+ self.connection_map.remove(&mapping.replica);
+ }
+ }
+ }
+ }
+}
diff --git a/core/message_bus/src/lib.rs b/core/message_bus/src/cache/mod.rs
similarity index 67%
copy from core/message_bus/src/lib.rs
copy to core/message_bus/src/cache/mod.rs
index 974061d6c..29ffaec68 100644
--- a/core/message_bus/src/lib.rs
+++ b/core/message_bus/src/cache/mod.rs
@@ -15,8 +15,16 @@
// specific language governing permissions and limitations
// under the License.
-pub trait MessageBus {}
+use crate::cache::connection::ShardedState;
-pub struct IggyMessageBus {}
+// TODO: Move to some common trait location.
+/// Allocation strategy that produces deltas for a specific sharded state type
+pub trait AllocationStrategy<SS>
+where
+ SS: ShardedState,
+{
+ fn allocate(&self, entry: SS::Entry) -> Option<SS::Delta>;
+ fn deallocate(&self, entry: SS::Entry) -> Option<SS::Delta>;
+}
-impl MessageBus for IggyMessageBus {}
+pub(crate) mod connection;
diff --git a/core/message_bus/src/lib.rs b/core/message_bus/src/lib.rs
index 974061d6c..c578a1fb0 100644
--- a/core/message_bus/src/lib.rs
+++ b/core/message_bus/src/lib.rs
@@ -14,9 +14,117 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
+mod cache;
-pub trait MessageBus {}
+use crate::cache::connection::{
+ ConnectionCache, Coordinator, LeastLoadedStrategy, ShardedConnections,
+};
+use iggy_common::{IggyError, SenderKind, TcpSender, header::GenericHeader,
message::Message};
+use std::{collections::HashMap, rc::Rc};
-pub struct IggyMessageBus {}
+/// Message bus parameterized by allocation strategy and sharded state
+pub trait MessageBus {
+ type Client;
+ type Replica;
+ type Data;
-impl MessageBus for IggyMessageBus {}
+ fn add_client(&mut self, client: Self::Client, sender: SenderKind) -> bool;
+ fn remove_client(&mut self, client: Self::Client) -> bool;
+
+ fn add_replica(&mut self, replica: Self::Replica) -> bool;
+ fn remove_replica(&mut self, replica: Self::Replica) -> bool;
+
+ // TODO: refactor consesus headers.
+ fn send_to_client(
+ &self,
+ client_id: Self::Client,
+ data: Self::Data,
+ ) -> impl Future<Output = Result<(), IggyError>>;
+ fn send_to_replica(
+ &self,
+ replica: Self::Replica,
+ data: Self::Data,
+ ) -> impl Future<Output = Result<(), IggyError>>;
+}
+
+// TODO: explore generics for Strategy
+pub struct IggyMessageBus {
+ clients: HashMap<u128, SenderKind>,
+ replicas: ShardedConnections<LeastLoadedStrategy, ConnectionCache>,
+ shard_id: u16,
+}
+
+impl IggyMessageBus {
+ pub fn new(total_shards: usize, shard_id: u16, seed: u64) -> Self {
+ Self {
+ clients: HashMap::new(),
+ replicas: ShardedConnections {
+ coordinator:
Coordinator::new(LeastLoadedStrategy::new(total_shards, seed)),
+ state: ConnectionCache {
+ shard_id,
+ ..Default::default()
+ },
+ },
+ shard_id,
+ }
+ }
+
+ pub fn get_replica_connection(&self, replica: u8) -> Option<Rc<TcpSender>>
{
+ let mapped_shard = self.replicas.state.get_mapped_shard(replica)?;
+ if mapped_shard == self.shard_id {
+ self.replicas.state.get_connection(replica)
+ } else {
+ None
+ }
+ }
+}
+
+impl MessageBus for IggyMessageBus {
+ type Client = u128;
+ type Replica = u8;
+ type Data = Message<GenericHeader>;
+
+ fn add_client(&mut self, client: Self::Client, sender: SenderKind) -> bool
{
+ if self.clients.contains_key(&client) {
+ return false;
+ }
+ self.clients.insert(client, sender);
+ true
+ }
+
+ fn remove_client(&mut self, client: Self::Client) -> bool {
+ self.clients.remove(&client).is_some()
+ }
+
+ fn add_replica(&mut self, replica: Self::Replica) -> bool {
+ self.replicas.allocate(replica)
+ }
+
+ fn remove_replica(&mut self, replica: Self::Replica) -> bool {
+ self.replicas.deallocate(replica)
+ }
+
+ async fn send_to_client(
+ &self,
+ client_id: Self::Client,
+ _message: Self::Data,
+ ) -> Result<(), IggyError> {
+ let _sender = self
+ .clients
+ .get(&client_id)
+ .ok_or(IggyError::ClientNotFound(client_id as u32))?;
+ Ok(())
+ }
+
+ async fn send_to_replica(
+ &self,
+ replica: Self::Replica,
+ _message: Self::Data,
+ ) -> Result<(), IggyError> {
+ // TODO: Handle lazily creating the connection.
+ let _connection = self
+ .get_replica_connection(replica)
+ .ok_or(IggyError::ResourceNotFound(format!("Replica {}",
replica)))?;
+ Ok(())
+ }
+}