This is an automated email from the ASF dual-hosted git repository.
hgruszecki pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new cc3746448 feat(simulator): implement bare bone version of simulator
(#2688)
cc3746448 is described below
commit cc37464487321d1b1b44952db6c6d1106b978cde
Author: Grzegorz Koszyk <[email protected]>
AuthorDate: Mon Feb 9 11:18:10 2026 +0100
feat(simulator): implement bare bone version of simulator (#2688)
Bare bone version of simulator, further progress is blocked by #2687
---
.github/workflows/_common.yml | 1 +
Cargo.lock | 14 +++
Cargo.toml | 1 +
DEPENDENCIES.md | 1 +
core/common/src/sender/mod.rs | 1 +
core/common/src/types/consensus/header.rs | 28 ++++-
core/common/src/types/consensus/message.rs | 71 ++++-------
core/consensus/src/impls.rs | 102 +++++++++-------
core/consensus/src/lib.rs | 18 +--
core/consensus/src/vsr_timeout.rs | 1 +
core/journal/src/lib.rs | 12 +-
core/message_bus/src/cache/connection.rs | 5 +-
core/message_bus/src/lib.rs | 7 +-
core/metadata/src/impls/metadata.rs | 184 +++++++++++++++++-----------
core/metadata/src/lib.rs | 8 +-
core/metadata/src/stm/mod.rs | 34 ++++--
core/metadata/src/stm/mux.rs | 20 ++--
core/metadata/src/stm/stream.rs | 1 +
core/simulator/Cargo.toml | 31 +++++
core/simulator/src/bus.rs | 186 +++++++++++++++++++++++++++++
core/simulator/src/client.rs | 99 +++++++++++++++
core/simulator/src/deps.rs | 155 ++++++++++++++++++++++++
core/simulator/src/lib.rs | 165 +++++++++++++++++++++++++
core/simulator/src/main.rs | 100 ++++++++++++++++
core/simulator/src/replica.rs | 67 +++++++++++
25 files changed, 1113 insertions(+), 199 deletions(-)
diff --git a/.github/workflows/_common.yml b/.github/workflows/_common.yml
index 3e2cc20cf..2a1a3aa7e 100644
--- a/.github/workflows/_common.yml
+++ b/.github/workflows/_common.yml
@@ -107,6 +107,7 @@ jobs:
metadata
message_bus
storage
+ simulator
configs
license-headers:
diff --git a/Cargo.lock b/Cargo.lock
index 19ff17599..13a4dd133 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -8433,6 +8433,20 @@ dependencies = [
"time",
]
+[[package]]
+name = "simulator"
+version = "0.1.0"
+dependencies = [
+ "bytemuck",
+ "bytes",
+ "consensus",
+ "futures",
+ "iggy_common",
+ "journal",
+ "message_bus",
+ "metadata",
+]
+
[[package]]
name = "siphasher"
version = "1.0.1"
diff --git a/Cargo.toml b/Cargo.toml
index 115b034c2..3566333b2 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -50,6 +50,7 @@ members = [
"core/partitions",
"core/sdk",
"core/server",
+ "core/simulator",
"core/tools",
"examples/rust",
]
diff --git a/DEPENDENCIES.md b/DEPENDENCIES.md
index 673532693..be466b0d5 100644
--- a/DEPENDENCIES.md
+++ b/DEPENDENCIES.md
@@ -730,6 +730,7 @@ simd-adler32: 0.3.8, "MIT",
simd-json: 0.17.0, "Apache-2.0 OR MIT",
simdutf8: 0.1.5, "Apache-2.0 OR MIT",
simple_asn1: 0.6.3, "ISC",
+simulator: 0.1.0, "N/A",
siphasher: 1.0.1, "Apache-2.0 OR MIT",
slab: 0.4.11, "MIT",
smallvec: 1.15.1, "Apache-2.0 OR MIT",
diff --git a/core/common/src/sender/mod.rs b/core/common/src/sender/mod.rs
index 11c13bc70..50b426460 100644
--- a/core/common/src/sender/mod.rs
+++ b/core/common/src/sender/mod.rs
@@ -86,6 +86,7 @@ pub trait Sender {
}
#[allow(clippy::large_enum_variant)]
+#[derive(Debug)]
pub enum SenderKind {
Tcp(TcpSender),
TcpTls(TcpTlsSender),
diff --git a/core/common/src/types/consensus/header.rs
b/core/common/src/types/consensus/header.rs
index 376b6be05..8f9538557 100644
--- a/core/common/src/types/consensus/header.rs
+++ b/core/common/src/types/consensus/header.rs
@@ -114,6 +114,8 @@ pub enum Operation {
UpdatePermissions = 145,
CreatePersonalAccessToken = 146,
DeletePersonalAccessToken = 147,
+
+ Reserved = 200,
}
#[repr(C)]
@@ -172,6 +174,30 @@ pub struct RequestHeader {
pub reserved: [u8; 95],
}
+impl Default for RequestHeader {
+ fn default() -> Self {
+ Self {
+ reserved: [0; 95],
+ checksum: 0,
+ checksum_body: 0,
+ cluster: 0,
+ size: 0,
+ epoch: 0,
+ view: 0,
+ release: 0,
+ protocol: 0,
+ command: Default::default(),
+ replica: 0,
+ reserved_frame: [0; 12],
+ client: 0,
+ request_checksum: 0,
+ timestamp: 0,
+ request: 0,
+ operation: Default::default(),
+ }
+ }
+}
+
unsafe impl Pod for RequestHeader {}
unsafe impl Zeroable for RequestHeader {}
@@ -348,7 +374,7 @@ impl ConsensusHeader for CommitHeader {
}
#[repr(C)]
-#[derive(Debug, Clone, Copy)]
+#[derive(Default, Debug, Clone, Copy)]
pub struct ReplyHeader {
pub checksum: u128,
pub checksum_body: u128,
diff --git a/core/common/src/types/consensus/message.rs
b/core/common/src/types/consensus/message.rs
index ac32cc3c8..5fe7e0843 100644
--- a/core/common/src/types/consensus/message.rs
+++ b/core/common/src/types/consensus/message.rs
@@ -15,8 +15,9 @@
// specific language governing permissions and limitations
// under the License.
-use crate::types::consensus::header::{
- self, CommitHeader, ConsensusHeader, GenericHeader, PrepareHeader,
PrepareOkHeader, ReplyHeader,
+use crate::{
+ header::RequestHeader,
+ types::consensus::header::{self, ConsensusHeader, PrepareHeader,
PrepareOkHeader},
};
use bytes::Bytes;
use std::marker::PhantomData;
@@ -278,33 +279,36 @@ where
#[derive(Debug)]
#[allow(unused)]
pub enum MessageBag {
- Generic(Message<GenericHeader>),
+ Request(Message<RequestHeader>),
Prepare(Message<PrepareHeader>),
PrepareOk(Message<PrepareOkHeader>),
- Commit(Message<CommitHeader>),
- Reply(Message<ReplyHeader>),
}
impl MessageBag {
#[allow(unused)]
pub fn command(&self) -> header::Command2 {
match self {
- MessageBag::Generic(message) => message.header().command,
+ MessageBag::Request(message) => message.header().command,
MessageBag::Prepare(message) => message.header().command,
MessageBag::PrepareOk(message) => message.header().command,
- MessageBag::Commit(message) => message.header().command,
- MessageBag::Reply(message) => message.header().command,
}
}
#[allow(unused)]
pub fn size(&self) -> u32 {
match self {
- MessageBag::Generic(message) => message.header().size(),
+ MessageBag::Request(message) => message.header().size(),
MessageBag::Prepare(message) => message.header().size(),
MessageBag::PrepareOk(message) => message.header().size(),
- MessageBag::Commit(message) => message.header().size(),
- MessageBag::Reply(message) => message.header().size(),
+ }
+ }
+
+ #[allow(unused)]
+ pub fn operation(&self) -> header::Operation {
+ match self {
+ MessageBag::Request(message) => message.header().operation,
+ MessageBag::Prepare(message) => message.header().operation,
+ MessageBag::PrepareOk(message) => message.header().operation,
}
}
}
@@ -326,17 +330,17 @@ where
unsafe {
Message::<header::PrepareHeader>::from_buffer_unchecked(buffer) };
MessageBag::Prepare(msg)
}
- header::Command2::Commit => {
- let msg = unsafe {
Message::<header::CommitHeader>::from_buffer_unchecked(buffer) };
- MessageBag::Commit(msg)
+ header::Command2::Request => {
+ let msg =
+ unsafe {
Message::<header::RequestHeader>::from_buffer_unchecked(buffer) };
+ MessageBag::Request(msg)
}
- header::Command2::Reply => {
- let msg = unsafe {
Message::<header::ReplyHeader>::from_buffer_unchecked(buffer) };
- MessageBag::Reply(msg)
+ header::Command2::PrepareOk => {
+ let msg =
+ unsafe {
Message::<header::PrepareOkHeader>::from_buffer_unchecked(buffer) };
+ MessageBag::PrepareOk(msg)
}
- _ => unreachable!(
- "For now we only support Prepare, Commit, and Reply. In the
future we will support more commands. Command2: {command:?}"
- ),
+ _ => unreachable!(),
}
}
}
@@ -500,33 +504,6 @@ mod tests {
assert_eq!(bag.command(), header::Command2::Prepare);
assert!(matches!(bag, MessageBag::Prepare(_)));
- assert!(!matches!(bag, MessageBag::Commit(_)));
- assert!(!matches!(bag, MessageBag::Reply(_)));
- assert!(!matches!(bag, MessageBag::Generic(_)));
- }
-
- #[test]
- fn test_message_bag_from_commit() {
- let commit = header::CommitHeader::create_test();
- let bag = MessageBag::from(commit);
-
- assert_eq!(bag.command(), header::Command2::Commit);
- assert!(!matches!(bag, MessageBag::Prepare(_)));
- assert!(matches!(bag, MessageBag::Commit(_)));
- assert!(!matches!(bag, MessageBag::Reply(_)));
- assert!(!matches!(bag, MessageBag::Generic(_)));
- }
-
- #[test]
- fn test_message_bag_from_reply() {
- let reply = header::ReplyHeader::create_test();
- let bag = MessageBag::from(reply);
-
- assert_eq!(bag.command(), header::Command2::Reply);
- assert!(!matches!(bag, MessageBag::Prepare(_)));
- assert!(!matches!(bag, MessageBag::Commit(_)));
- assert!(matches!(bag, MessageBag::Reply(_)));
- assert!(!matches!(bag, MessageBag::Generic(_)));
}
}
diff --git a/core/consensus/src/impls.rs b/core/consensus/src/impls.rs
index 154a6c3a4..36b595163 100644
--- a/core/consensus/src/impls.rs
+++ b/core/consensus/src/impls.rs
@@ -27,6 +27,7 @@ use iggy_common::header::{
};
use iggy_common::message::Message;
use message_bus::IggyMessageBus;
+use message_bus::MessageBus;
use std::cell::{Cell, RefCell};
use std::collections::VecDeque;
@@ -43,6 +44,7 @@ pub trait Sequencer {
fn set_sequence(&self, sequence: Self::Sequence);
}
+#[derive(Debug)]
pub struct LocalSequencer {
op: Cell<u64>,
}
@@ -82,6 +84,7 @@ pub const PIPELINE_PREPARE_QUEUE_MAX: usize = 8;
/// Maximum number of replicas in a cluster.
pub const REPLICAS_MAX: usize = 32;
+#[derive(Debug)]
pub struct PipelineEntry {
pub message: Message<PrepareHeader>,
/// Bitmap of replicas that have acknowledged this prepare.
@@ -133,6 +136,7 @@ impl RequestEntry {
}
}
+#[derive(Debug)]
pub struct LocalPipeline {
/// Messages being prepared (uncommitted and being replicated).
prepare_queue: VecDeque<PipelineEntry>,
@@ -218,12 +222,8 @@ impl LocalPipeline {
self.prepare_queue.back()
}
- /// Find a message by op number and checksum.
- pub fn message_by_op_and_checksum(
- &mut self,
- op: u64,
- checksum: u128,
- ) -> Option<&mut PipelineEntry> {
+ /// Find a message by op number and checksum (immutable).
+ pub fn message_by_op_and_checksum(&self, op: u64, checksum: u128) ->
Option<&PipelineEntry> {
let head_op = self.prepare_queue.front()?.message.header().op;
let tail_op = self.prepare_queue.back()?.message.header().op;
@@ -239,7 +239,7 @@ impl LocalPipeline {
}
let index = (op - head_op) as usize;
- let entry = self.prepare_queue.get_mut(index)?;
+ let entry = self.prepare_queue.get(index)?;
debug_assert_eq!(entry.message.header().op, op);
@@ -320,6 +320,20 @@ impl LocalPipeline {
pub fn clear(&mut self) {
self.prepare_queue.clear();
}
+
+ /// Extract and remove a message by op number.
+ /// Returns None if op is not in the pipeline.
+ pub fn extract_by_op(&mut self, op: u64) -> Option<PipelineEntry> {
+ let head_op = self.prepare_queue.front()?.message.header().op;
+ if op < head_op {
+ return None;
+ }
+ let index = (op - head_op) as usize;
+ if index >= self.prepare_queue.len() {
+ return None;
+ }
+ self.prepare_queue.remove(index)
+ }
}
impl Pipeline for LocalPipeline {
@@ -334,15 +348,23 @@ impl Pipeline for LocalPipeline {
LocalPipeline::pop_message(self)
}
+ fn extract_by_op(&mut self, op: u64) -> Option<Self::Entry> {
+ LocalPipeline::extract_by_op(self, op)
+ }
+
fn clear(&mut self) {
LocalPipeline::clear(self)
}
+ fn message_by_op(&self, op: u64) -> Option<&Self::Entry> {
+ LocalPipeline::message_by_op(self, op)
+ }
+
fn message_by_op_mut(&mut self, op: u64) -> Option<&mut Self::Entry> {
LocalPipeline::message_by_op_mut(self, op)
}
- fn message_by_op_and_checksum(&mut self, op: u64, checksum: u128) ->
Option<&mut Self::Entry> {
+ fn message_by_op_and_checksum(&self, op: u64, checksum: u128) ->
Option<&Self::Entry> {
LocalPipeline::message_by_op_and_checksum(self, op, checksum)
}
@@ -386,7 +408,11 @@ pub enum VsrAction {
}
#[allow(unused)]
-pub struct VsrConsensus {
+#[derive(Debug)]
+pub struct VsrConsensus<B = IggyMessageBus>
+where
+ B: MessageBus,
+{
cluster: u128,
replica: u8,
replica_count: u8,
@@ -412,7 +438,7 @@ pub struct VsrConsensus {
pipeline: RefCell<LocalPipeline>,
- message_bus: IggyMessageBus,
+ message_bus: B,
// TODO: Add loopback_queue for messages to self
/// Tracks start view change messages received from all replicas
(including self)
start_view_change_from_all_replicas: RefCell<BitSet<u32>>,
@@ -430,8 +456,8 @@ pub struct VsrConsensus {
timeouts: RefCell<TimeoutManager>,
}
-impl VsrConsensus {
- pub fn new(cluster: u128, replica: u8, replica_count: u8) -> Self {
+impl<B: MessageBus> VsrConsensus<B> {
+ pub fn new(cluster: u128, replica: u8, replica_count: u8, message_bus: B)
-> Self {
assert!(
replica < replica_count,
"replica index must be < replica_count"
@@ -449,7 +475,7 @@ impl VsrConsensus {
last_timestamp: Cell::new(0),
last_prepare_checksum: Cell::new(0),
pipeline: RefCell::new(LocalPipeline::new()),
- message_bus: IggyMessageBus::new(replica_count as usize, replica
as u16, 0),
+ message_bus,
start_view_change_from_all_replicas:
RefCell::new(BitSet::with_capacity(REPLICAS_MAX)),
do_view_change_from_all_replicas:
RefCell::new(dvc_quorum_array_empty()),
do_view_change_quorum: Cell::new(false),
@@ -459,6 +485,11 @@ impl VsrConsensus {
}
}
+ // TODO: More init logic.
+ pub fn init(&self) {
+ self.status.set(Status::Normal);
+ }
+
pub fn primary_index(&self, view: u32) -> u8 {
view as u8 % self.replica_count
}
@@ -995,9 +1026,9 @@ impl VsrConsensus {
/// Called on the primary when a follower acknowledges a prepare.
///
/// Returns true if quorum was just reached for this op.
- pub fn handle_prepare_ok(&self, message: Message<PrepareOkHeader>) -> bool
{
- let header = message.header();
-
+ /// Handle a PrepareOk message. Returns true if quorum was reached.
+ /// Note: Caller (on_ack) should validate is_primary and status before
calling.
+ pub fn handle_prepare_ok(&self, header: &PrepareOkHeader) -> bool {
assert_eq!(header.command, Command2::PrepareOk);
assert!(
header.replica < self.replica_count,
@@ -1005,26 +1036,16 @@ impl VsrConsensus {
header.replica
);
- // Ignore if not in normal status
- if self.status() != Status::Normal {
- return false;
- }
-
// Ignore if from older view
if header.view < self.view() {
return false;
}
- // Ignore if from newer view. This shouldn't happen if we're primary
+ // Ignore if from newer view
if header.view > self.view() {
return false;
}
- // We must be primary to process prepare_ok
- if !self.is_primary() {
- return false;
- }
-
// Ignore if syncing
if self.is_syncing() {
return false;
@@ -1043,9 +1064,6 @@ impl VsrConsensus {
return false;
}
- // Verify the prepare is for a valid op range
- let _commit = self.commit();
-
// Check for duplicate ack
if entry.has_ack(header.replica) {
return false;
@@ -1058,22 +1076,19 @@ impl VsrConsensus {
// Check if we've reached quorum
if ack_count >= quorum && !entry.ok_quorum_received {
entry.ok_quorum_received = true;
-
return true;
}
false
}
- pub fn message_bus(&self) -> &IggyMessageBus {
+ pub fn message_bus(&self) -> &B {
&self.message_bus
}
}
-impl Project<Message<PrepareHeader>> for Message<RequestHeader> {
- type Consensus = VsrConsensus;
-
- fn project(self, consensus: &Self::Consensus) -> Message<PrepareHeader> {
+impl<B: MessageBus> Project<Message<PrepareHeader>, VsrConsensus<B>> for
Message<RequestHeader> {
+ fn project(self, consensus: &VsrConsensus<B>) -> Message<PrepareHeader> {
let op = consensus.sequencer.current_sequence() + 1;
self.transmute_header(|old, new| {
@@ -1085,6 +1100,7 @@ impl Project<Message<PrepareHeader>> for
Message<RequestHeader> {
release: old.release,
command: Command2::Prepare,
replica: consensus.replica,
+ client: old.client,
parent: 0, // TODO: Get parent checksum from the previous
entry in the journal (figure out how to pass that ctx here)
request_checksum: old.request_checksum,
request: old.request,
@@ -1098,10 +1114,8 @@ impl Project<Message<PrepareHeader>> for
Message<RequestHeader> {
}
}
-impl Project<Message<PrepareOkHeader>> for Message<PrepareHeader> {
- type Consensus = VsrConsensus;
-
- fn project(self, consensus: &Self::Consensus) -> Message<PrepareOkHeader> {
+impl<B: MessageBus> Project<Message<PrepareOkHeader>, VsrConsensus<B>> for
Message<PrepareHeader> {
+ fn project(self, consensus: &VsrConsensus<B>) -> Message<PrepareOkHeader> {
self.transmute_header(|old, new| {
*new = PrepareOkHeader {
command: Command2::PrepareOk,
@@ -1124,8 +1138,8 @@ impl Project<Message<PrepareOkHeader>> for
Message<PrepareHeader> {
}
}
-impl Consensus for VsrConsensus {
- type MessageBus = IggyMessageBus;
+impl<B: MessageBus> Consensus for VsrConsensus<B> {
+ type MessageBus = B;
type RequestMessage = Message<RequestHeader>;
type ReplicateMessage = Message<PrepareHeader>;
@@ -1162,9 +1176,9 @@ impl Consensus for VsrConsensus {
// verify op is sequential
assert_eq!(
header.op,
- self.sequencer.current_sequence() + 1,
+ self.sequencer.current_sequence(),
"op must be sequential: expected {}, got {}",
- self.sequencer.current_sequence() + 1,
+ self.sequencer.current_sequence(),
header.op
);
diff --git a/core/consensus/src/lib.rs b/core/consensus/src/lib.rs
index 1cf323d94..5c3e152e6 100644
--- a/core/consensus/src/lib.rs
+++ b/core/consensus/src/lib.rs
@@ -17,9 +17,8 @@
use message_bus::MessageBus;
-pub trait Project<T> {
- type Consensus: Consensus;
- fn project(self, consensus: &Self::Consensus) -> T;
+pub trait Project<T, C: Consensus> {
+ fn project(self, consensus: &C) -> T;
}
pub trait Pipeline {
@@ -30,11 +29,16 @@ pub trait Pipeline {
fn pop_message(&mut self) -> Option<Self::Entry>;
+ /// Extract and remove a message by op number.
+ fn extract_by_op(&mut self, op: u64) -> Option<Self::Entry>;
+
fn clear(&mut self);
+ fn message_by_op(&self, op: u64) -> Option<&Self::Entry>;
+
fn message_by_op_mut(&mut self, op: u64) -> Option<&mut Self::Entry>;
- fn message_by_op_and_checksum(&mut self, op: u64, checksum: u128) ->
Option<&mut Self::Entry>;
+ fn message_by_op_and_checksum(&self, op: u64, checksum: u128) ->
Option<&Self::Entry>;
fn is_full(&self) -> bool;
@@ -43,11 +47,11 @@ pub trait Pipeline {
fn verify(&self);
}
-pub trait Consensus {
+pub trait Consensus: Sized {
type MessageBus: MessageBus;
// I am wondering, whether we should create a dedicated trait for cloning,
so it's explicit that we do ref counting.
- type RequestMessage: Project<Self::ReplicateMessage, Consensus = Self> +
Clone;
- type ReplicateMessage: Project<Self::AckMessage, Consensus = Self> + Clone;
+ type RequestMessage: Project<Self::ReplicateMessage, Self> + Clone;
+ type ReplicateMessage: Project<Self::AckMessage, Self> + Clone;
type AckMessage;
type Sequencer: Sequencer;
type Pipeline: Pipeline<Message = Self::ReplicateMessage>;
diff --git a/core/consensus/src/vsr_timeout.rs
b/core/consensus/src/vsr_timeout.rs
index 140d40f02..d5cda482e 100644
--- a/core/consensus/src/vsr_timeout.rs
+++ b/core/consensus/src/vsr_timeout.rs
@@ -109,6 +109,7 @@ pub enum TimeoutKind {
/// Manager for all VSR timeouts of a replica.
#[allow(unused)]
+#[derive(Debug)]
pub struct TimeoutManager {
ping: Timeout,
prepare: Timeout,
diff --git a/core/journal/src/lib.rs b/core/journal/src/lib.rs
index b091bcd3d..07056416f 100644
--- a/core/journal/src/lib.rs
+++ b/core/journal/src/lib.rs
@@ -24,21 +24,19 @@ where
type Header;
type Entry;
- fn entry(&self, header: &Self::Header) -> Option<&Self::Entry>;
-
- fn previous_entry(&self, header: &Self::Header) -> Option<Self::Header>;
-
- fn set_header_as_dirty(&self, header: &Self::Header);
+ fn header(&self, idx: usize) -> Option<&Self::Header>;
+ fn previous_header(&self, header: &Self::Header) -> Option<&Self::Header>;
fn append(&self, entry: Self::Entry) -> impl Future<Output = ()>;
+ fn entry(&self, header: &Self::Header) -> impl Future<Output =
Option<Self::Entry>>;
}
// TODO: Move to other crate.
pub trait Storage {
type Buffer;
- fn write(&self, buf: Self::Buffer) -> usize;
- fn read(&self, offset: u64, buffer: Self::Buffer) -> Self::Buffer;
+ fn write(&self, buf: Self::Buffer) -> impl Future<Output = usize>;
+ fn read(&self, offset: usize, buffer: Self::Buffer) -> impl Future<Output
= Self::Buffer>;
}
pub trait JournalHandle {
diff --git a/core/message_bus/src/cache/connection.rs
b/core/message_bus/src/cache/connection.rs
index d07ac66ec..633dc2280 100644
--- a/core/message_bus/src/cache/connection.rs
+++ b/core/message_bus/src/cache/connection.rs
@@ -34,6 +34,7 @@ pub trait ShardedState {
}
/// Least-loaded allocation strategy for connections
+#[derive(Debug)]
pub struct LeastLoadedStrategy {
total_shards: usize,
connections_per_shard: RefCell<Vec<(u16, usize)>>,
@@ -193,6 +194,7 @@ impl AllocationStrategy<ConnectionCache> for
LeastLoadedStrategy {
}
/// Coordinator that wraps a strategy for a specific sharded state type
+#[derive(Debug)]
pub struct Coordinator<A, SS>
where
SS: ShardedState,
@@ -223,6 +225,7 @@ where
}
}
+#[derive(Debug)]
pub struct ShardedConnections<A, SS>
where
SS: ShardedState,
@@ -259,7 +262,7 @@ where
}
/// Cache for connection state per shard
-#[derive(Default)]
+#[derive(Debug, Default)]
pub struct ConnectionCache {
pub shard_id: u16,
pub connections: HashMap<u8, Option<Rc<TcpSender>>>,
diff --git a/core/message_bus/src/lib.rs b/core/message_bus/src/lib.rs
index c578a1fb0..027fe9f56 100644
--- a/core/message_bus/src/lib.rs
+++ b/core/message_bus/src/lib.rs
@@ -27,8 +27,9 @@ pub trait MessageBus {
type Client;
type Replica;
type Data;
+ type Sender;
- fn add_client(&mut self, client: Self::Client, sender: SenderKind) -> bool;
+ fn add_client(&mut self, client: Self::Client, sender: Self::Sender) ->
bool;
fn remove_client(&mut self, client: Self::Client) -> bool;
fn add_replica(&mut self, replica: Self::Replica) -> bool;
@@ -48,6 +49,7 @@ pub trait MessageBus {
}
// TODO: explore generics for Strategy
+#[derive(Debug)]
pub struct IggyMessageBus {
clients: HashMap<u128, SenderKind>,
replicas: ShardedConnections<LeastLoadedStrategy, ConnectionCache>,
@@ -83,8 +85,9 @@ impl MessageBus for IggyMessageBus {
type Client = u128;
type Replica = u8;
type Data = Message<GenericHeader>;
+ type Sender = SenderKind;
- fn add_client(&mut self, client: Self::Client, sender: SenderKind) -> bool
{
+ fn add_client(&mut self, client: Self::Client, sender: Self::Sender) ->
bool {
if self.clients.contains_key(&client) {
return false;
}
diff --git a/core/metadata/src/impls/metadata.rs
b/core/metadata/src/impls/metadata.rs
index c8c7fdf90..c4e4b3da3 100644
--- a/core/metadata/src/impls/metadata.rs
+++ b/core/metadata/src/impls/metadata.rs
@@ -14,61 +14,63 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
+use crate::stm::StateMachine;
use consensus::{Consensus, Project, Sequencer, Status, VsrConsensus};
use iggy_common::{
- header::{Command2, PrepareHeader, PrepareOkHeader},
+ header::{Command2, GenericHeader, PrepareHeader, PrepareOkHeader,
ReplyHeader},
message::Message,
};
use journal::{Journal, JournalHandle};
use message_bus::MessageBus;
use tracing::{debug, warn};
-#[expect(unused)]
pub trait Metadata<C>
where
C: Consensus,
{
/// Handle a request message.
- fn on_request(&self, message: C::RequestMessage);
+ fn on_request(&self, message: C::RequestMessage) -> impl Future<Output =
()>;
/// Handle a replicate message (Prepare in VSR).
fn on_replicate(&self, message: C::ReplicateMessage) -> impl Future<Output
= ()>;
/// Handle an ack message (PrepareOk in VSR).
- fn on_ack(&self, message: C::AckMessage);
+ fn on_ack(&self, message: C::AckMessage) -> impl Future<Output = ()>;
}
-#[expect(unused)]
-struct IggyMetadata<C, J, S, M> {
+#[derive(Debug)]
+pub struct IggyMetadata<C, J, S, M> {
/// Some on shard0, None on other shards
- consensus: Option<C>,
+ pub consensus: Option<C>,
/// Some on shard0, None on other shards
- journal: Option<J>,
+ pub journal: Option<J>,
/// Some on shard0, None on other shards
- snapshot: Option<S>,
+ pub snapshot: Option<S>,
/// State machine - lives on all shards
- mux_stm: M,
+ pub mux_stm: M,
}
-impl<J, S, M> Metadata<VsrConsensus> for IggyMetadata<VsrConsensus, J, S, M>
+impl<B, J, S, M> Metadata<VsrConsensus<B>> for IggyMetadata<VsrConsensus<B>,
J, S, M>
where
+ B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>,
J: JournalHandle,
J::Target: Journal<
J::Storage,
- Entry = <VsrConsensus as Consensus>::ReplicateMessage,
+ Entry = <VsrConsensus<B> as Consensus>::ReplicateMessage,
Header = PrepareHeader,
>,
+ M: StateMachine<Input = Message<PrepareHeader>>,
{
- fn on_request(&self, message: <VsrConsensus as Consensus>::RequestMessage)
{
+ async fn on_request(&self, message: <VsrConsensus<B> as
Consensus>::RequestMessage) {
let consensus = self.consensus.as_ref().unwrap();
// TODO: Bunch of asserts.
debug!("handling metadata request");
let prepare = message.project(consensus);
- self.pipeline_prepare(prepare);
+ self.pipeline_prepare(prepare).await;
}
- async fn on_replicate(&self, message: <VsrConsensus as
Consensus>::ReplicateMessage) {
+ async fn on_replicate(&self, message: <VsrConsensus<B> as
Consensus>::ReplicateMessage) {
let consensus = self.consensus.as_ref().unwrap();
let journal = self.journal.as_ref().unwrap();
@@ -93,20 +95,6 @@ where
let current_op = consensus.sequencer().current_sequence();
- // Old message (handle as repair). Not replicating.
- if header.view < consensus.view()
- || (consensus.status() == Status::Normal
- && header.view == consensus.view()
- && header.op <= current_op)
- {
- debug!(
- replica = consensus.replica(),
- "on_replicate: ignoring (repair)"
- );
- self.on_repair(message);
- return;
- }
-
// If status is not normal, ignore the replicate.
if consensus.status() != Status::Normal {
warn!(
@@ -137,14 +125,13 @@ where
// TODO handle gap in ops.
// Verify hash chain integrity.
- if let Some(previous) = journal.handle().previous_entry(header) {
- self.panic_if_hash_chain_would_break_in_same_view(&previous,
header);
+ if let Some(previous) = journal.handle().previous_header(header) {
+ self.panic_if_hash_chain_would_break_in_same_view(previous,
header);
}
assert_eq!(header.op, current_op + 1);
consensus.sequencer().set_sequence(header.op);
- journal.handle().set_header_as_dirty(header);
// Append to journal.
journal.handle().append(message.clone()).await;
@@ -158,7 +145,7 @@ where
}
}
- fn on_ack(&self, message: <VsrConsensus as Consensus>::AckMessage) {
+ async fn on_ack(&self, message: <VsrConsensus<B> as
Consensus>::AckMessage) {
let consensus = self.consensus.as_ref().unwrap();
let header = message.header();
@@ -172,63 +159,117 @@ where
return;
}
- // Find the prepare in pipeline
- let mut pipeline = consensus.pipeline().borrow_mut();
- let Some(entry) = pipeline.message_by_op_and_checksum(header.op,
header.prepare_checksum)
- else {
- debug!("on_ack: prepare not in pipeline op={}", header.op);
- return;
- };
-
- // Verify checksum matches
- if entry.message.header().checksum != header.prepare_checksum {
- warn!("on_ack: checksum mismatch");
- return;
+ // Verify checksum by checking pipeline entry exists
+ {
+ let pipeline = consensus.pipeline().borrow();
+ let Some(entry) =
+ pipeline.message_by_op_and_checksum(header.op,
header.prepare_checksum)
+ else {
+ debug!("on_ack: prepare not in pipeline op={}", header.op);
+ return;
+ };
+
+ if entry.message.header().checksum != header.prepare_checksum {
+ warn!("on_ack: checksum mismatch");
+ return;
+ }
}
- // Record ack
- let count = entry.add_ack(header.replica);
-
- // Check quorum
- if count >= consensus.quorum() && !entry.ok_quorum_received {
- entry.ok_quorum_received = true;
+ // Let consensus handle the ack increment and quorum check
+ if consensus.handle_prepare_ok(header) {
debug!("on_ack: quorum received for op={}", header.op);
-
- // Advance commit number and trigger commit journal
consensus.advance_commit_number(header.op);
- self.commit_journal();
+
+ // Extract the prepare message from the pipeline by op
+ // TODO: Commit from the head. ALWAYS
+ let entry =
consensus.pipeline().borrow_mut().extract_by_op(header.op);
+ let Some(entry) = entry else {
+ warn!("on_ack: prepare not found in pipeline for op={}",
header.op);
+ return;
+ };
+
+ let prepare = entry.message;
+ let prepare_header = *prepare.header();
+
+ // Apply the state (consumes prepare)
+ // TODO: Handle appending result to response
+ let _result = self.mux_stm.update(prepare);
+ debug!("on_ack: state applied for op={}", prepare_header.op);
+
+ // TODO: Figure out better infra for this, its messy.
+ let reply =
Message::<ReplyHeader>::new(std::mem::size_of::<ReplyHeader>())
+ .transmute_header(|_, new| {
+ *new = ReplyHeader {
+ checksum: 0,
+ checksum_body: 0,
+ cluster: consensus.cluster(),
+ size: std::mem::size_of::<ReplyHeader>() as u32,
+ epoch: prepare_header.epoch,
+ view: consensus.view(),
+ release: 0,
+ protocol: 0,
+ command: Command2::Reply,
+ replica: consensus.replica(),
+ reserved_frame: [0; 12],
+ request_checksum: prepare_header.request_checksum,
+ request_checksum_padding: 0,
+ context: 0,
+ context_padding: 0,
+ op: prepare_header.op,
+ commit: consensus.commit(),
+ timestamp: prepare_header.timestamp,
+ request: prepare_header.request,
+ operation: prepare_header.operation,
+ ..Default::default()
+ };
+ });
+
+ // Send reply to client
+ let generic_reply = reply.into_generic();
+ debug!(
+ "on_ack: sending reply to client={} for op={}",
+ prepare_header.client, prepare_header.op
+ );
+
+ // TODO: Error handling
+ consensus
+ .message_bus()
+ .send_to_client(prepare_header.client, generic_reply)
+ .await
+ .unwrap()
}
}
}
-impl<J, S, M> IggyMetadata<VsrConsensus, J, S, M>
+impl<B, J, S, M> IggyMetadata<VsrConsensus<B>, J, S, M>
where
+ B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>,
J: JournalHandle,
J::Target: Journal<
J::Storage,
- Entry = <VsrConsensus as Consensus>::ReplicateMessage,
+ Entry = <VsrConsensus<B> as Consensus>::ReplicateMessage,
Header = PrepareHeader,
>,
+ M: StateMachine<Input = Message<PrepareHeader>>,
{
- #[expect(unused)]
- fn pipeline_prepare(&self, prepare: Message<PrepareHeader>) {
+ async fn pipeline_prepare(&self, prepare: Message<PrepareHeader>) {
let consensus = self.consensus.as_ref().unwrap();
debug!("inserting prepare into metadata pipeline");
consensus.verify_pipeline();
consensus.pipeline_message(prepare.clone());
- self.on_replicate(prepare.clone());
+ self.on_replicate(prepare.clone()).await;
consensus.post_replicate_verify(&prepare);
}
fn fence_old_prepare(&self, prepare: &Message<PrepareHeader>) -> bool {
- let (Some(consensus), Some(journal)) = (&self.consensus,
&self.journal) else {
- todo!("dispatch fence_old_prepare to shard0");
- };
+ let consensus = self.consensus.as_ref().unwrap();
+ let journal = self.journal.as_ref().unwrap();
let header = prepare.header();
- header.op <= consensus.commit() ||
journal.handle().entry(header).is_some()
+ // TODO: Handle idx calculation, for now using header.op, but since
the journal may get compacted, this may not be correct.
+ header.op <= consensus.commit() || journal.handle().header(header.op
as usize).is_some()
}
/// Replicate a prepare message to the next replica in the chain.
@@ -243,9 +284,11 @@ where
let header = message.header();
+ // TODO: calculate the index;
+ let idx = header.op as usize;
assert_eq!(header.command, Command2::Prepare);
assert!(
- journal.handle().entry(header).is_none(),
+ journal.handle().header(idx).is_none(),
"replicate: must not already have prepare"
);
assert!(header.op > consensus.commit());
@@ -279,10 +322,6 @@ where
.unwrap();
}
- fn on_repair(&self, _message: Message<PrepareHeader>) {
- todo!()
- }
-
/// Verify hash chain would not break if we add this header.
fn panic_if_hash_chain_would_break_in_same_view(
&self,
@@ -306,7 +345,6 @@ where
// TODO: Implement commit logic
// Walk through journal from last committed to current commit number
// Apply each entry to the state machine
- todo!()
}
/// Send a prepare_ok message to the primary.
@@ -335,7 +373,7 @@ where
}
// Verify we have the prepare and it's persisted (not dirty).
- if journal.handle().entry(header).is_none() {
+ if journal.handle().header(header.op as usize).is_none() {
debug!(
replica = consensus.replica(),
op = header.op,
@@ -398,6 +436,12 @@ where
"send_prepare_ok: loopback to self"
);
// TODO: Queue for self-processing or call handle_prepare_ok
directly
+ // TODO: This is temporal, to test simulator, but we should send
message to ourselves properly.
+ consensus
+ .message_bus()
+ .send_to_replica(primary, generic_message)
+ .await
+ .unwrap();
} else {
debug!(
replica = consensus.replica(),
diff --git a/core/metadata/src/lib.rs b/core/metadata/src/lib.rs
index 824cf614f..08c137fff 100644
--- a/core/metadata/src/lib.rs
+++ b/core/metadata/src/lib.rs
@@ -17,8 +17,14 @@
//! Iggy metadata module
-mod impls;
+pub mod impls;
pub mod permissioner;
pub mod stm;
mod stats;
+
+// Re-export IggyMetadata and Metadata trait for use in other modules
+pub use impls::metadata::{IggyMetadata, Metadata};
+
+// Re-export MuxStateMachine for use in other modules
+pub use stm::mux::MuxStateMachine;
diff --git a/core/metadata/src/stm/mod.rs b/core/metadata/src/stm/mod.rs
index 9dfb44852..c92b0ca10 100644
--- a/core/metadata/src/stm/mod.rs
+++ b/core/metadata/src/stm/mod.rs
@@ -31,6 +31,15 @@ where
inner: UnsafeCell<WriteHandle<T, O>>,
}
+impl<T, O> std::fmt::Debug for WriteCell<T, O>
+where
+ T: Absorb<O>,
+{
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ f.debug_struct("WriteCell").finish_non_exhaustive()
+ }
+}
+
impl<T, O> WriteCell<T, O>
where
T: Absorb<O>,
@@ -53,11 +62,12 @@ where
}
/// Parses type-erased input into a command. Macro-generated.
+/// Returns `Ok(cmd)` if applicable, `Err(input)` to pass ownership back.
pub trait Command {
type Cmd;
type Input;
- fn parse(input: &Self::Input) -> Option<Self::Cmd>;
+ fn parse(input: Self::Input) -> Result<Self::Cmd, Self::Input>;
}
/// Handles commands. User-implemented business logic.
@@ -65,6 +75,7 @@ pub trait Handler: Command {
fn handle(&mut self, cmd: &Self::Cmd);
}
+#[derive(Debug)]
pub struct LeftRight<T, C>
where
T: Absorb<C>,
@@ -100,17 +111,18 @@ where
}
/// Public interface for state machines.
+/// Returns `Ok(output)` if applicable, `Err(input)` to pass ownership back.
pub trait State {
type Output;
type Input;
- fn apply(&self, input: &Self::Input) -> Option<Self::Output>;
+ fn apply(&self, input: Self::Input) -> Result<Self::Output, Self::Input>;
}
pub trait StateMachine {
type Input;
type Output;
- fn update(&self, input: &Self::Input, output: &mut Vec<Self::Output>);
+ fn update(&self, input: Self::Input) -> Self::Output;
}
/// Generates a state machine with convention-based storage.
@@ -180,6 +192,7 @@ macro_rules! define_state {
)*
}
+ #[derive(Debug)]
pub struct $state {
inner: $crate::stm::LeftRight<[<$state Inner>], [<$state
Command>]>,
}
@@ -201,9 +214,10 @@ macro_rules! define_state {
type Input = <[<$state Inner>] as $crate::stm::Command>::Input;
type Output = ();
- fn apply(&self, input: &Self::Input) -> Option<Self::Output> {
- <[<$state Inner>] as $crate::stm::Command>::parse(input)
- .map(|cmd| self.inner.do_apply(cmd))
+ fn apply(&self, input: Self::Input) -> Result<Self::Output,
Self::Input> {
+ let cmd = <[<$state Inner>] as
$crate::stm::Command>::parse(input)?;
+ self.inner.do_apply(cmd);
+ Ok(())
}
}
@@ -211,20 +225,20 @@ macro_rules! define_state {
type Cmd = [<$state Command>];
type Input =
::iggy_common::message::Message<::iggy_common::header::PrepareHeader>;
- fn parse(input: &Self::Input) -> Option<Self::Cmd> {
+ fn parse(input: Self::Input) -> Result<Self::Cmd, Self::Input>
{
use ::iggy_common::BytesSerializable;
use ::iggy_common::header::Operation;
- let body = input.body_bytes();
match input.header().operation {
$(
Operation::$operation => {
- Some([<$state Command>]::$operation(
+ let body = input.body_bytes();
+ Ok([<$state Command>]::$operation(
$operation::from_bytes(body).unwrap()
))
},
)*
- _ => None,
+ _ => Err(input),
}
}
}
diff --git a/core/metadata/src/stm/mux.rs b/core/metadata/src/stm/mux.rs
index 979ff3ccf..f68d99bf7 100644
--- a/core/metadata/src/stm/mux.rs
+++ b/core/metadata/src/stm/mux.rs
@@ -20,6 +20,7 @@ use iggy_common::{header::PrepareHeader, message::Message};
use crate::stm::{State, StateMachine};
// MuxStateMachine that proxies to an tuple of variadic state machines
+#[derive(Debug)]
pub struct MuxStateMachine<T>
where
T: StateMachine,
@@ -43,8 +44,8 @@ where
type Input = T::Input;
type Output = T::Output;
- fn update(&self, input: &Self::Input, output: &mut Vec<Self::Output>) {
- self.inner.update(input, output);
+ fn update(&self, input: Self::Input) -> Self::Output {
+ self.inner.update(input)
}
}
@@ -66,12 +67,14 @@ macro_rules! variadic {
impl StateMachine for () {
type Input = Message<PrepareHeader>;
// TODO: Make sure that the `Output` matches to the output type of the
rest of list.
+ // TODO: Add a trait bound to the output that will allow us to get the
response in bytes.
type Output = ();
- fn update(&self, _input: &Self::Input, _output: &mut Vec<Self::Output>) {}
+ fn update(&self, _input: Self::Input) -> Self::Output {}
}
// Recursive case: process head and recurse on tail
+// No Clone bound needed - ownership passes through via Result
impl<O, S, Rest> StateMachine for variadic!(S, ...Rest)
where
S: State<Output = O>,
@@ -80,11 +83,11 @@ where
type Input = Rest::Input;
type Output = O;
- fn update(&self, input: &Self::Input, output: &mut Vec<Self::Output>) {
- if let Some(result) = self.0.apply(input) {
- output.push(result);
+ fn update(&self, input: Self::Input) -> Self::Output {
+ match self.0.apply(input) {
+ Ok(result) => result,
+ Err(input) => self.1.update(input),
}
- self.1.update(input, output)
}
}
@@ -104,8 +107,7 @@ mod tests {
let mux = MuxStateMachine::new(variadic!(users, streams));
let input = Message::new(std::mem::size_of::<PrepareHeader>());
- let mut output = Vec::new();
- mux.update(&input, &mut output);
+ mux.update(input);
}
}
diff --git a/core/metadata/src/stm/stream.rs b/core/metadata/src/stm/stream.rs
index 5106d6122..d1a3da212 100644
--- a/core/metadata/src/stm/stream.rs
+++ b/core/metadata/src/stm/stream.rs
@@ -182,6 +182,7 @@ define_state! {
DeletePartitions
]
}
+
impl_absorb!(StreamsInner, StreamsCommand);
impl StreamsInner {
diff --git a/core/simulator/Cargo.toml b/core/simulator/Cargo.toml
new file mode 100644
index 000000000..e75769201
--- /dev/null
+++ b/core/simulator/Cargo.toml
@@ -0,0 +1,31 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[package]
+name = "simulator"
+version = "0.1.0"
+edition = "2024"
+
+[dependencies]
+bytemuck = { workspace = true }
+bytes = { workspace = true }
+consensus = { path = "../consensus" }
+futures = { workspace = true }
+iggy_common = { path = "../common" }
+journal = { path = "../journal" }
+message_bus = { path = "../message_bus" }
+metadata = { path = "../metadata" }
diff --git a/core/simulator/src/bus.rs b/core/simulator/src/bus.rs
new file mode 100644
index 000000000..b4dc1e52a
--- /dev/null
+++ b/core/simulator/src/bus.rs
@@ -0,0 +1,186 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use iggy_common::{IggyError, header::GenericHeader, message::Message};
+use message_bus::MessageBus;
+use std::collections::{HashMap, VecDeque};
+use std::ops::Deref;
+use std::sync::{Arc, Mutex};
+
+/// Message envelope for tracking sender/recipient
+#[derive(Debug, Clone)]
+pub struct Envelope {
+ pub from_replica: Option<u8>,
+ pub to_replica: Option<u8>,
+ pub to_client: Option<u128>,
+ pub message: Message<GenericHeader>,
+}
+
+// TODO: Proper bus with an `Network` component which would simulate sending
packets.
+// Tigerbeetle handles this by having an list of "buses", and calling
callbacks for clients when an response is send.
+// This requires self-referntial structs (as message_bus has to store
collection of other buses), which is overcomplilcated.
+// I think the way we could handle that is by having an dedicated collection
for client responses (clients_table).
+#[derive(Debug, Default)]
+pub struct MemBus {
+ clients: Mutex<HashMap<u128, ()>>,
+ replicas: Mutex<HashMap<u8, ()>>,
+ pending_messages: Mutex<VecDeque<Envelope>>,
+}
+
+impl MemBus {
+ pub fn new() -> Self {
+ Self {
+ clients: Mutex::new(HashMap::new()),
+ replicas: Mutex::new(HashMap::new()),
+ pending_messages: Mutex::new(VecDeque::new()),
+ }
+ }
+
+ /// Get the next pending message from the bus
+ pub fn receive(&self) -> Option<Envelope> {
+ self.pending_messages.lock().unwrap().pop_front()
+ }
+}
+
+impl MessageBus for MemBus {
+ type Client = u128;
+ type Replica = u8;
+ type Data = Message<GenericHeader>;
+ type Sender = ();
+
+ fn add_client(&mut self, client: Self::Client, _sender: Self::Sender) ->
bool {
+ if self.clients.lock().unwrap().contains_key(&client) {
+ return false;
+ }
+ self.clients.lock().unwrap().insert(client, ());
+ true
+ }
+
+ fn remove_client(&mut self, client: Self::Client) -> bool {
+ self.clients.lock().unwrap().remove(&client).is_some()
+ }
+
+ fn add_replica(&mut self, replica: Self::Replica) -> bool {
+ if self.replicas.lock().unwrap().contains_key(&replica) {
+ return false;
+ }
+ self.replicas.lock().unwrap().insert(replica, ());
+ true
+ }
+
+ fn remove_replica(&mut self, replica: Self::Replica) -> bool {
+ self.replicas.lock().unwrap().remove(&replica).is_some()
+ }
+
+ async fn send_to_client(
+ &self,
+ client_id: Self::Client,
+ message: Self::Data,
+ ) -> Result<(), IggyError> {
+ if !self.clients.lock().unwrap().contains_key(&client_id) {
+ return Err(IggyError::ClientNotFound(client_id as u32));
+ }
+
+ self.pending_messages.lock().unwrap().push_back(Envelope {
+ from_replica: None,
+ to_replica: None,
+ to_client: Some(client_id),
+ message,
+ });
+
+ Ok(())
+ }
+
+ async fn send_to_replica(
+ &self,
+ replica: Self::Replica,
+ message: Self::Data,
+ ) -> Result<(), IggyError> {
+ if !self.replicas.lock().unwrap().contains_key(&replica) {
+ return Err(IggyError::ResourceNotFound(format!("Replica {}",
replica)));
+ }
+
+ self.pending_messages.lock().unwrap().push_back(Envelope {
+ from_replica: None,
+ to_replica: Some(replica),
+ to_client: None,
+ message,
+ });
+
+ Ok(())
+ }
+}
+
+/// Newtype wrapper for shared MemBus that implements MessageBus
+#[derive(Debug, Clone)]
+pub struct SharedMemBus(pub Arc<MemBus>);
+
+impl Deref for SharedMemBus {
+ type Target = MemBus;
+ fn deref(&self) -> &Self::Target {
+ &self.0
+ }
+}
+
+impl MessageBus for SharedMemBus {
+ type Client = u128;
+ type Replica = u8;
+ type Data = Message<GenericHeader>;
+ type Sender = ();
+
+ fn add_client(&mut self, client: Self::Client, sender: Self::Sender) ->
bool {
+ self.0
+ .clients
+ .lock()
+ .unwrap()
+ .insert(client, sender)
+ .is_none()
+ }
+
+ fn remove_client(&mut self, client: Self::Client) -> bool {
+ self.0.clients.lock().unwrap().remove(&client).is_some()
+ }
+
+ fn add_replica(&mut self, replica: Self::Replica) -> bool {
+ self.0
+ .replicas
+ .lock()
+ .unwrap()
+ .insert(replica, ())
+ .is_none()
+ }
+
+ fn remove_replica(&mut self, replica: Self::Replica) -> bool {
+ self.0.replicas.lock().unwrap().remove(&replica).is_some()
+ }
+
+ async fn send_to_client(
+ &self,
+ client_id: Self::Client,
+ message: Self::Data,
+ ) -> Result<(), IggyError> {
+ self.0.send_to_client(client_id, message).await
+ }
+
+ async fn send_to_replica(
+ &self,
+ replica: Self::Replica,
+ message: Self::Data,
+ ) -> Result<(), IggyError> {
+ self.0.send_to_replica(replica, message).await
+ }
+}
diff --git a/core/simulator/src/client.rs b/core/simulator/src/client.rs
new file mode 100644
index 000000000..b2f57c152
--- /dev/null
+++ b/core/simulator/src/client.rs
@@ -0,0 +1,99 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use iggy_common::{
+ BytesSerializable, Identifier,
+ create_stream::CreateStream,
+ delete_stream::DeleteStream,
+ header::{Operation, RequestHeader},
+ message::Message,
+};
+use std::cell::Cell;
+
+// TODO: Proper client which implements the full client SDK API
+pub struct SimClient {
+ client_id: u128,
+ request_counter: Cell<u64>,
+}
+
+impl SimClient {
+ pub fn new(client_id: u128) -> Self {
+ Self {
+ client_id,
+ request_counter: Cell::new(0),
+ }
+ }
+
+ fn next_request_number(&self) -> u64 {
+ let current = self.request_counter.get();
+ self.request_counter.set(current + 1);
+ current
+ }
+
+ pub fn create_stream(&self, name: &str) -> Message<RequestHeader> {
+ let create_stream = CreateStream {
+ name: name.to_string(),
+ };
+ let payload = create_stream.to_bytes();
+
+ self.build_request(Operation::CreateStream, payload)
+ }
+
+ pub fn delete_stream(&self, name: &str) -> Message<RequestHeader> {
+ let delete_stream = DeleteStream {
+ stream_id: Identifier::named(name).unwrap(),
+ };
+ let payload = delete_stream.to_bytes();
+
+ self.build_request(Operation::DeleteStream, payload)
+ }
+
+ fn build_request(&self, operation: Operation, payload: bytes::Bytes) ->
Message<RequestHeader> {
+ use bytes::Bytes;
+
+ let header_size = std::mem::size_of::<RequestHeader>();
+ let total_size = header_size + payload.len();
+
+ let header = RequestHeader {
+ command: iggy_common::header::Command2::Request,
+ operation,
+ size: total_size as u32,
+ cluster: 0, // TODO: Get from config
+ checksum: 0,
+ checksum_body: 0,
+ epoch: 0,
+ view: 0,
+ release: 0,
+ protocol: 0,
+ replica: 0,
+ reserved_frame: [0; 12],
+ client: self.client_id,
+ request_checksum: 0,
+ timestamp: 0, // TODO: Use actual timestamp
+ request: self.next_request_number(),
+ ..Default::default()
+ };
+
+ let header_bytes = bytemuck::bytes_of(&header);
+ let mut buffer = Vec::with_capacity(total_size);
+ buffer.extend_from_slice(header_bytes);
+ buffer.extend_from_slice(&payload);
+
+ Message::<RequestHeader>::from_bytes(Bytes::from(buffer))
+ .expect("failed to build request message")
+ }
+}
diff --git a/core/simulator/src/deps.rs b/core/simulator/src/deps.rs
new file mode 100644
index 000000000..3dbdac54f
--- /dev/null
+++ b/core/simulator/src/deps.rs
@@ -0,0 +1,155 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::bus::SharedMemBus;
+use bytes::Bytes;
+use consensus::VsrConsensus;
+use iggy_common::header::PrepareHeader;
+use iggy_common::message::Message;
+use journal::{Journal, JournalHandle, Storage};
+use metadata::stm::consumer_group::ConsumerGroups;
+use metadata::stm::stream::Streams;
+use metadata::stm::user::Users;
+use metadata::{IggyMetadata, MuxStateMachine, variadic};
+use std::cell::{Cell, RefCell, UnsafeCell};
+use std::collections::HashMap;
+
+/// In-memory storage backend for testing/simulation
+#[derive(Debug, Default)]
+pub struct MemStorage {
+ data: RefCell<Vec<u8>>,
+ offset: Cell<u64>,
+}
+
+impl Storage for MemStorage {
+ type Buffer = Vec<u8>;
+
+ async fn write(&self, buf: Self::Buffer) -> usize {
+ let len = buf.len();
+ self.data.borrow_mut().extend_from_slice(&buf);
+ self.offset.set(self.offset.get() + len as u64);
+ len
+ }
+
+ async fn read(&self, offset: usize, mut buffer: Self::Buffer) ->
Self::Buffer {
+ let data = self.data.borrow();
+ let end = offset + buffer.len();
+ if offset < data.len() && end <= data.len() {
+ buffer[..].copy_from_slice(&data[offset..end]);
+ }
+ buffer
+ }
+}
+
+// TODO: Replace with actual Journal, the only thing that we will need to
change is the `Storage` impl for an in-memory one.
+/// Generic in-memory journal implementation for testing/simulation
+pub struct SimJournal<S: Storage> {
+ storage: S,
+ headers: UnsafeCell<HashMap<u64, PrepareHeader>>,
+ offsets: UnsafeCell<HashMap<u64, usize>>,
+}
+
+impl<S: Storage + Default> Default for SimJournal<S> {
+ fn default() -> Self {
+ Self {
+ storage: S::default(),
+ headers: UnsafeCell::new(HashMap::new()),
+ offsets: UnsafeCell::new(HashMap::new()),
+ }
+ }
+}
+
+impl<S: Storage> std::fmt::Debug for SimJournal<S> {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ f.debug_struct("SimJournal")
+ .field("storage", &"<Storage>")
+ .field("headers", &"<UnsafeCell>")
+ .field("offsets", &"<UnsafeCell>")
+ .finish()
+ }
+}
+
+impl<S: Storage<Buffer = Vec<u8>>> Journal<S> for SimJournal<S> {
+ type Header = PrepareHeader;
+ type Entry = Message<PrepareHeader>;
+
+ async fn entry(&self, header: &Self::Header) -> Option<Self::Entry> {
+ let headers = unsafe { &*self.headers.get() };
+ let offsets = unsafe { &*self.offsets.get() };
+
+ let header = headers.get(&header.op)?;
+ let offset = *offsets.get(&header.op)?;
+
+ let buffer = vec![0; header.size as usize];
+ let buffer = self.storage.read(offset, buffer).await;
+ let message =
+ Message::from_bytes(Bytes::from(buffer)).expect("simulator: bytes
should be valid");
+ Some(message)
+ }
+
+ fn previous_header(&self, header: &Self::Header) -> Option<&Self::Header> {
+ if header.op == 0 {
+ return None;
+ }
+ unsafe { &*self.headers.get() }.get(&(header.op - 1))
+ }
+
+ async fn append(&self, entry: Self::Entry) {
+ let header = *entry.header();
+ let message_bytes = entry.as_bytes();
+
+ let bytes_written = self.storage.write(message_bytes.to_vec()).await;
+
+ let current_offset = unsafe { &mut *self.offsets.get() }
+ .values()
+ .last()
+ .cloned()
+ .unwrap_or_default();
+
+ unsafe { &mut *self.headers.get() }.insert(header.op, header);
+ unsafe { &mut *self.offsets.get() }.insert(header.op, current_offset +
bytes_written);
+ }
+
+ fn header(&self, idx: usize) -> Option<&Self::Header> {
+ let headers = unsafe { &*self.headers.get() };
+ headers.get(&(idx as u64))
+ }
+}
+
+impl JournalHandle for SimJournal<MemStorage> {
+ type Storage = MemStorage;
+ type Target = SimJournal<MemStorage>;
+
+ fn handle(&self) -> &Self::Target {
+ self
+ }
+}
+
+#[derive(Debug, Default)]
+pub struct SimSnapshot {}
+
+/// Type aliases for simulator metadata
+pub type SimMuxStateMachine = MuxStateMachine<variadic!(Users, Streams,
ConsumerGroups)>;
+pub type SimMetadata = IggyMetadata<
+ VsrConsensus<SharedMemBus>,
+ SimJournal<MemStorage>,
+ SimSnapshot,
+ SimMuxStateMachine,
+>;
+
+#[derive(Debug, Default)]
+pub struct ReplicaPartitions {}
diff --git a/core/simulator/src/lib.rs b/core/simulator/src/lib.rs
new file mode 100644
index 000000000..d1f64fc09
--- /dev/null
+++ b/core/simulator/src/lib.rs
@@ -0,0 +1,165 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+pub mod bus;
+pub mod client;
+pub mod deps;
+pub mod replica;
+
+use bus::MemBus;
+use iggy_common::header::{GenericHeader, ReplyHeader};
+use iggy_common::message::{Message, MessageBag};
+use message_bus::MessageBus;
+use metadata::Metadata;
+use replica::Replica;
+use std::sync::Arc;
+
+#[derive(Debug)]
+pub struct Simulator {
+ pub replicas: Vec<Replica>,
+ pub message_bus: Arc<MemBus>,
+}
+
+impl Simulator {
+ pub fn new(replica_count: usize, clients: impl Iterator<Item = u128>) ->
Self {
+ let mut message_bus = MemBus::new();
+ for client in clients {
+ message_bus.add_client(client, ());
+ }
+
+ for i in 0..replica_count as u8 {
+ message_bus.add_replica(i);
+ }
+
+ let message_bus = Arc::new(message_bus);
+ let replicas = (0..replica_count)
+ .map(|i| {
+ Replica::new(
+ i as u8,
+ format!("replica-{}", i),
+ Arc::clone(&message_bus),
+ replica_count as u8,
+ )
+ })
+ .collect();
+
+ Self {
+ replicas,
+ message_bus,
+ }
+ }
+
+ pub fn with_message_bus(replica_count: usize, mut message_bus: MemBus) ->
Self {
+ for i in 0..replica_count as u8 {
+ message_bus.add_replica(i);
+ }
+
+ let message_bus = Arc::new(message_bus);
+ let replicas = (0..replica_count)
+ .map(|i| {
+ Replica::new(
+ i as u8,
+ format!("replica-{}", i),
+ Arc::clone(&message_bus),
+ replica_count as u8,
+ )
+ })
+ .collect();
+
+ Self {
+ replicas,
+ message_bus,
+ }
+ }
+}
+
+impl Simulator {
+ pub async fn step(&self) -> Option<Message<ReplyHeader>> {
+ if let Some(envelope) = self.message_bus.receive() {
+ if let Some(_client_id) = envelope.to_client {
+ let reply: Message<ReplyHeader> = envelope
+ .message
+ .try_into_typed()
+ .expect("invalid message, wrong command type for an client
response");
+ return Some(reply);
+ }
+
+ if let Some(replica_id) = envelope.to_replica
+ && let Some(replica) = self.replicas.get(replica_id as usize)
+ {
+ self.dispatch_to_replica(replica, envelope.message).await;
+ }
+ }
+
+ None
+ }
+
+ async fn dispatch_to_replica(&self, replica: &Replica, message:
Message<GenericHeader>) {
+ let message: MessageBag = message.into();
+ let operation = match &message {
+ MessageBag::Request(message) => message.header().operation,
+ MessageBag::Prepare(message) => message.header().operation,
+ MessageBag::PrepareOk(message) => message.header().operation,
+ } as u8;
+
+ if operation < 200 {
+ self.dispatch_to_metadata_on_replica(replica, message).await;
+ } else {
+ self.dispatch_to_partition_on_replica(replica, message);
+ }
+ }
+
+ async fn dispatch_to_metadata_on_replica(&self, replica: &Replica,
message: MessageBag) {
+ match message {
+ MessageBag::Request(request) => {
+ replica.metadata.on_request(request).await;
+ }
+ MessageBag::Prepare(prepare) => {
+ replica.metadata.on_replicate(prepare).await;
+ }
+ MessageBag::PrepareOk(prepare_ok) => {
+ replica.metadata.on_ack(prepare_ok).await;
+ }
+ }
+ }
+
+ fn dispatch_to_partition_on_replica(&self, replica: &Replica, message:
MessageBag) {
+ match message {
+ MessageBag::Request(request) => {
+ todo!(
+ "dispatch request to partition replica {}: operation={:?}",
+ replica.id,
+ request.header().operation
+ );
+ }
+ MessageBag::Prepare(prepare) => {
+ todo!(
+ "dispatch prepare to partition replica {}: operation={:?}",
+ replica.id,
+ prepare.header().operation
+ );
+ }
+ MessageBag::PrepareOk(prepare_ok) => {
+ todo!(
+ "dispatch prepare_ok to partition replica {}: op={}",
+ replica.id,
+ prepare_ok.header().op
+ );
+ }
+ }
+ }
+}
diff --git a/core/simulator/src/main.rs b/core/simulator/src/main.rs
new file mode 100644
index 000000000..995c717a3
--- /dev/null
+++ b/core/simulator/src/main.rs
@@ -0,0 +1,100 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use iggy_common::header::ReplyHeader;
+use iggy_common::message::Message;
+use message_bus::MessageBus;
+use simulator::{Simulator, client::SimClient};
+use std::collections::VecDeque;
+use std::sync::{Arc, Mutex};
+
+/// Shared response queue for client replies
+#[derive(Default)]
+pub struct Responses {
+ queue: VecDeque<Message<ReplyHeader>>,
+}
+
+impl Responses {
+ pub fn push(&mut self, msg: Message<ReplyHeader>) {
+ self.queue.push_back(msg);
+ }
+
+ pub fn pop(&mut self) -> Option<Message<ReplyHeader>> {
+ self.queue.pop_front()
+ }
+}
+
+fn main() {
+ let client_id: u128 = 1;
+ let leader: u8 = 0;
+ let sim = Simulator::new(3, std::iter::once(client_id));
+ let bus = sim.message_bus.clone();
+
+ // Responses queue
+ let responses = Arc::new(Mutex::new(Responses::default()));
+ let responses_clone = responses.clone();
+
+ // TODO: Scuffed client/simulator setup.
+ // We need a better interface on simulator
+ let client_handle = std::thread::spawn(move || {
+ futures::executor::block_on(async {
+ let client = SimClient::new(client_id);
+
+ let create_msg = client.create_stream("test-stream");
+ bus.send_to_replica(leader, create_msg.into_generic())
+ .await
+ .expect("failed to send create_stream");
+
+ loop {
+ if let Some(reply) = responses_clone.lock().unwrap().pop() {
+ println!("[client] Got create_stream reply: {:?}",
reply.header());
+ break;
+ }
+ std::thread::sleep(std::time::Duration::from_millis(1));
+ }
+
+ let delete_msg = client.delete_stream("test-stream");
+ bus.send_to_replica(leader, delete_msg.into_generic())
+ .await
+ .expect("failed to send delete_stream");
+
+ loop {
+ if let Some(reply) = responses_clone.lock().unwrap().pop() {
+ println!("[client] Got delete_stream reply: {:?}",
reply.header());
+ break;
+ }
+ std::thread::sleep(std::time::Duration::from_millis(1));
+ }
+ });
+ });
+
+ println!("[sim] Starting simulator loop");
+ futures::executor::block_on(async {
+ loop {
+ if let Some(reply) = sim.step().await {
+ responses.lock().unwrap().push(reply);
+ }
+
+ if client_handle.is_finished() {
+ break;
+ }
+ }
+ });
+
+ client_handle.join().expect("client thread panicked");
+ println!("[sim] Simulator loop ended");
+}
diff --git a/core/simulator/src/replica.rs b/core/simulator/src/replica.rs
new file mode 100644
index 000000000..74c26b72f
--- /dev/null
+++ b/core/simulator/src/replica.rs
@@ -0,0 +1,67 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::bus::{MemBus, SharedMemBus};
+use crate::deps::{
+ MemStorage, ReplicaPartitions, SimJournal, SimMetadata,
SimMuxStateMachine, SimSnapshot,
+};
+use consensus::VsrConsensus;
+use metadata::stm::consumer_group::{ConsumerGroups, ConsumerGroupsInner};
+use metadata::stm::stream::{Streams, StreamsInner};
+use metadata::stm::user::{Users, UsersInner};
+use metadata::{IggyMetadata, variadic};
+use std::sync::Arc;
+
+#[derive(Debug)]
+pub struct Replica {
+ pub id: u8,
+ pub name: String,
+ pub metadata: SimMetadata,
+ pub partitions: ReplicaPartitions,
+ pub bus: Arc<MemBus>,
+}
+
+impl Replica {
+ pub fn new(id: u8, name: String, bus: Arc<MemBus>, replica_count: u8) ->
Self {
+ let users: Users = UsersInner::new().into();
+ let streams: Streams = StreamsInner::new().into();
+ let consumer_groups: ConsumerGroups =
ConsumerGroupsInner::new().into();
+ let mux = SimMuxStateMachine::new(variadic!(users, streams,
consumer_groups));
+
+ let cluster_id: u128 = 1; // TODO: Make configurable
+ let consensus = VsrConsensus::new(
+ cluster_id,
+ id,
+ replica_count,
+ SharedMemBus(Arc::clone(&bus)),
+ );
+ consensus.init();
+
+ Self {
+ id,
+ name,
+ metadata: IggyMetadata {
+ consensus: Some(consensus),
+ journal: Some(SimJournal::<MemStorage>::default()),
+ snapshot: Some(SimSnapshot::default()),
+ mux_stm: mux,
+ },
+ partitions: ReplicaPartitions::default(),
+ bus,
+ }
+ }
+}