This is an automated email from the ASF dual-hosted git repository. gkoszyk pushed a commit to branch partitions in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 1bf09e1353e3a93e1071750299797ad0d884055e Author: numinex <[email protected]> AuthorDate: Fri Feb 13 13:25:56 2026 +0100 temp v2 --- Cargo.lock | 1 + core/common/src/types/consensus/header.rs | 1 + core/partitions/src/iggy_partitions.rs | 145 ++++++++++++++++++++++++++++-- core/partitions/src/types.rs | 40 ++++++++- core/simulator/Cargo.toml | 1 + core/simulator/src/client.rs | 74 ++++++++++++++- core/simulator/src/deps.rs | 4 +- core/simulator/src/lib.rs | 31 +++---- core/simulator/src/main.rs | 32 ++++++- core/simulator/src/replica.rs | 40 +++++++-- rust_out | Bin 0 -> 3893768 bytes 11 files changed, 333 insertions(+), 36 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5a7fdcd97..502883e00 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8454,6 +8454,7 @@ dependencies = [ "journal", "message_bus", "metadata", + "partitions", ] [[package]] diff --git a/core/common/src/types/consensus/header.rs b/core/common/src/types/consensus/header.rs index f7f64969d..bab1812c6 100644 --- a/core/common/src/types/consensus/header.rs +++ b/core/common/src/types/consensus/header.rs @@ -156,6 +156,7 @@ impl ConsensusHeader for GenericHeader { } } + #[repr(C)] #[derive(Debug, Clone, Copy)] pub struct RequestHeader { diff --git a/core/partitions/src/iggy_partitions.rs b/core/partitions/src/iggy_partitions.rs index bd2881e05..998e30dd9 100644 --- a/core/partitions/src/iggy_partitions.rs +++ b/core/partitions/src/iggy_partitions.rs @@ -22,8 +22,8 @@ use crate::IggyPartition; use crate::Partitions; use consensus::{Consensus, Project, Sequencer, Status, VsrConsensus}; use iggy_common::{ - IggyByteSize, IggyIndexesMut, IggyMessagesBatchMut, INDEX_SIZE, PooledBuffer, Segment, - SegmentStorage, + IggyByteSize, IggyIndexesMut, IggyMessagesBatchMut, INDEX_SIZE, PartitionStats, PooledBuffer, + Segment, SegmentStorage, header::{Command2, GenericHeader, Operation, PrepareHeader, PrepareOkHeader, ReplyHeader}, message::Message, sharding::{IggyNamespace, LocalIdx, ShardId}, @@ -33,6 +33,7 @@ use message_bus::MessageBus; use std::cell::UnsafeCell; use std::collections::HashMap; use std::sync::atomic::Ordering; +use std::sync::Arc; use tracing::{debug, warn}; /// Per-shard collection of all partitions. @@ -228,6 +229,99 @@ impl<C> IggyPartitions<C> { &mut self.partitions_mut()[idx] } + /// Initialize a new partition with in-memory storage (for testing/simulation). + /// + /// This is a simplified version that doesn't create file-backed storage. + /// Use `init_partition()` for production use with real files. + /// + /// TODO: Make the log generic over its storage backend to support both + /// in-memory (for testing) and file-backed (for production) storage without + /// needing separate initialization methods. + pub fn init_partition_in_memory(&mut self, namespace: IggyNamespace) -> LocalIdx { + // Check if already initialized + if let Some(idx) = self.local_idx(&namespace) { + return idx; + } + + // Create initial segment with default (in-memory) storage + let start_offset = 0; + let segment = Segment::new(start_offset, self.config.segment_size); + let storage = SegmentStorage::default(); + + // Create partition with initialized log + let stats = Arc::new(PartitionStats::default()); + let mut partition = IggyPartition::new(stats.clone()); + partition.log.add_persisted_segment(segment, storage); + partition.offset.store(start_offset, Ordering::Relaxed); + partition.dirty_offset.store(start_offset, Ordering::Relaxed); + partition.should_increment_offset = false; + partition.stats.increment_segments_count(1); + + // Insert and return local index + self.insert(namespace, partition) + } + + /// Initialize a new partition with file-backed storage. + /// + /// This is the data plane initialization - creates the partition structure, + /// initial segment, and storage. Skips the control plane metadata broadcasting. + /// + /// Corresponds to the "INITIATE PARTITION" phase in the server's flow: + /// 1. Control plane: create PartitionMeta (SKIPPED in this method) + /// 2. Control plane: broadcast to shards (SKIPPED in this method) + /// 3. Data plane: INITIATE PARTITION (THIS METHOD) + /// + /// Idempotent - returns existing LocalIdx if partition already exists. + pub async fn init_partition(&mut self, namespace: IggyNamespace) -> LocalIdx { + // Check if already initialized + if let Some(idx) = self.local_idx(&namespace) { + return idx; + } + + // Create initial segment with storage + let start_offset = 0; + let segment = Segment::new(start_offset, self.config.segment_size); + + // TODO: Waiting for issue to move server config to shared module. + // Once complete, paths will come from proper base_path/streams_path/etc config fields. + let messages_path = self.config.get_messages_path( + namespace.stream_id(), + namespace.topic_id(), + namespace.partition_id(), + start_offset, + ); + let index_path = self.config.get_index_path( + namespace.stream_id(), + namespace.topic_id(), + namespace.partition_id(), + start_offset, + ); + + let storage = SegmentStorage::new( + &messages_path, + &index_path, + 0, // messages_size (new segment) + 0, // indexes_size (new segment) + self.config.enforce_fsync, + self.config.enforce_fsync, + false, // file_exists (new segment) + ) + .await + .expect("Failed to create segment storage"); + + // Create partition with initialized log + let stats = Arc::new(PartitionStats::default()); + let mut partition = IggyPartition::new(stats.clone()); + partition.log.add_persisted_segment(segment, storage); + partition.offset.store(start_offset, Ordering::Relaxed); + partition.dirty_offset.store(start_offset, Ordering::Relaxed); + partition.should_increment_offset = false; + partition.stats.increment_segments_count(1); + + // Insert and return local index + self.insert(namespace, partition) + } + } impl<B> Partitions<VsrConsensus<B>> for IggyPartitions<VsrConsensus<B>> @@ -480,8 +574,9 @@ where /// Append a batch to a partition's journal with offset assignment. /// - /// Only writes to the journal — segment metadata (timestamps, end_offset, - /// current_position) is updated later when the journal is flushed to disk. + /// Updates `segment.current_position` (logical position for indexing) but + /// not `segment.end_offset` or `segment.end_timestamp` (committed state). + /// Those are updated during commit. /// /// Uses `dirty_offset` for offset assignment so that multiple prepares /// can be pipelined before any commit. @@ -529,6 +624,12 @@ where .store(last_dirty_offset, Ordering::Relaxed); } + // Update segment.current_position for next prepare_for_persistence call. + // This is the logical position (includes unflushed journal data). + // segment.size is only updated after actual persist (in persist_frozen_batches_to_disk). + let segment_index = partition.log.segments().len() - 1; + partition.log.segments_mut()[segment_index].current_position += batch_messages_size; + // Update journal tracking metadata. let journal = partition.log.journal_mut(); journal.info.messages_count += batch_messages_count; @@ -648,6 +749,7 @@ where } // 1. Update segment metadata from journal state. + // Note: segment.current_position is already updated in append_batch (prepare phase). let segment_index = partition.log.segments().len() - 1; let segment = &mut partition.log.segments_mut()[segment_index]; @@ -656,7 +758,6 @@ where } segment.end_timestamp = journal_info.end_timestamp; segment.end_offset = journal_info.current_offset; - segment.current_position += journal_info.size.as_bytes_u64() as u32; // 2. Update stats. partition @@ -812,11 +913,39 @@ where let segment = Segment::new(start_offset, self.config.segment_size); - // TODO: Create actual storage with file paths from config. - // For now create an empty storage placeholder. - let storage = SegmentStorage::default(); + // TODO: Waiting for issue to move server config to shared module. + // Once complete, paths will come from proper base_path/streams_path/etc config fields. + let messages_path = self.config.get_messages_path( + namespace.stream_id(), + namespace.topic_id(), + namespace.partition_id(), + start_offset, + ); + let index_path = self.config.get_index_path( + namespace.stream_id(), + namespace.topic_id(), + namespace.partition_id(), + start_offset, + ); + + let storage = SegmentStorage::new( + &messages_path, + &index_path, + 0, // messages_size (new segment) + 0, // indexes_size (new segment) + self.config.enforce_fsync, + self.config.enforce_fsync, + false, // file_exists (new segment) + ) + .await + .expect("Failed to create segment storage"); // Clear old segment's indexes. + // TODO: Waiting for issue to move server config to shared module. + // Once complete, conditionally clear based on cache_indexes config: + // if !matches!(self.config.cache_indexes, CacheIndexesConfig::All) { + // partition.log.indexes_mut()[old_segment_index] = None; + // } partition.log.indexes_mut()[old_segment_index] = None; // Close writers for the sealed segment. diff --git a/core/partitions/src/types.rs b/core/partitions/src/types.rs index 3496c28f7..886cb715e 100644 --- a/core/partitions/src/types.rs +++ b/core/partitions/src/types.rs @@ -161,7 +161,7 @@ impl Default for PartitionOffsets { /// /// Mirrors the relevant fields from the server's `PartitionConfig` and /// `SegmentConfig` (`core/server/src/configs/system.rs`). -#[derive(Debug, Clone, Copy)] +#[derive(Debug, Clone)] pub struct PartitionsConfig { /// Flush journal to disk when it accumulates this many messages. pub messages_required_to_save: u32, @@ -172,3 +172,41 @@ pub struct PartitionsConfig { /// Maximum size of a single segment before rotation. pub segment_size: IggyByteSize, } + +impl PartitionsConfig { + /// Constructs the file path for segment messages. + /// + /// TODO: This is a stub waiting for completion of issue to move server config + /// to shared module. Real implementation should use: + /// `{base_path}/{streams_path}/{stream_id}/{topics_path}/{topic_id}/{partitions_path}/{partition_id}/{start_offset:0>20}.log` + pub fn get_messages_path( + &self, + stream_id: usize, + topic_id: usize, + partition_id: usize, + start_offset: u64, + ) -> String { + format!( + "/tmp/iggy_stub/streams/{}/topics/{}/partitions/{}/{:0>20}.log", + stream_id, topic_id, partition_id, start_offset + ) + } + + /// Constructs the file path for segment indexes. + /// + /// TODO: This is a stub waiting for completion of issue to move server config + /// to shared module. Real implementation should use: + /// `{base_path}/{streams_path}/{stream_id}/{topics_path}/{topic_id}/{partitions_path}/{partition_id}/{start_offset:0>20}.index` + pub fn get_index_path( + &self, + stream_id: usize, + topic_id: usize, + partition_id: usize, + start_offset: u64, + ) -> String { + format!( + "/tmp/iggy_stub/streams/{}/topics/{}/partitions/{}/{:0>20}.index", + stream_id, topic_id, partition_id, start_offset + ) + } +} diff --git a/core/simulator/Cargo.toml b/core/simulator/Cargo.toml index e75769201..b8e25866d 100644 --- a/core/simulator/Cargo.toml +++ b/core/simulator/Cargo.toml @@ -29,3 +29,4 @@ iggy_common = { path = "../common" } journal = { path = "../journal" } message_bus = { path = "../message_bus" } metadata = { path = "../metadata" } +partitions = { path = "../partitions" } diff --git a/core/simulator/src/client.rs b/core/simulator/src/client.rs index b2f57c152..a8e5ccbb7 100644 --- a/core/simulator/src/client.rs +++ b/core/simulator/src/client.rs @@ -16,11 +16,12 @@ // under the License. use iggy_common::{ - BytesSerializable, Identifier, + BytesSerializable, INDEX_SIZE, Identifier, create_stream::CreateStream, delete_stream::DeleteStream, header::{Operation, RequestHeader}, message::Message, + sharding::IggyNamespace, }; use std::cell::Cell; @@ -62,6 +63,77 @@ impl SimClient { self.build_request(Operation::DeleteStream, payload) } + pub fn send_messages( + &self, + namespace: IggyNamespace, + messages: Vec<&[u8]>, + ) -> Message<RequestHeader> { + // Build batch: count | indexes | messages + let count = messages.len() as u32; + let mut indexes = Vec::with_capacity(count as usize * INDEX_SIZE); + let mut messages_buf = Vec::new(); + + let mut current_position = 0u32; + for msg in &messages { + // Write index: position (u32) + length (u32) + indexes.extend_from_slice(¤t_position.to_le_bytes()); + indexes.extend_from_slice(&(msg.len() as u32).to_le_bytes()); + + // Append message + messages_buf.extend_from_slice(msg); + current_position += msg.len() as u32; + } + + // Build payload: count | indexes | messages + let mut payload = Vec::with_capacity(4 + indexes.len() + messages_buf.len()); + payload.extend_from_slice(&count.to_le_bytes()); + payload.extend_from_slice(&indexes); + payload.extend_from_slice(&messages_buf); + + self.build_request_with_namespace(Operation::SendMessages, bytes::Bytes::from(payload), namespace) + } + + fn build_request_with_namespace( + &self, + operation: Operation, + payload: bytes::Bytes, + namespace: IggyNamespace, + ) -> Message<RequestHeader> { + use bytes::Bytes; + + let header_size = std::mem::size_of::<RequestHeader>(); + let total_size = header_size + payload.len(); + + let header = RequestHeader { + command: iggy_common::header::Command2::Request, + operation, + size: total_size as u32, + cluster: 0, + checksum: 0, + checksum_body: 0, + epoch: 0, + view: 0, + release: 0, + protocol: 0, + replica: 0, + reserved_frame: [0; 12], + client: self.client_id, + request_checksum: 0, + timestamp: 0, + request: self.next_request_number(), + namespace: namespace.inner(), + ..Default::default() + }; + + let header_bytes = bytemuck::bytes_of(&header); + let mut buffer = Vec::with_capacity(total_size); + buffer.extend_from_slice(header_bytes); + buffer.extend_from_slice(&payload); + + Message::<RequestHeader>::from_bytes(Bytes::from(buffer)) + .expect("failed to build request message") + } + fn build_request(&self, operation: Operation, payload: bytes::Bytes) -> Message<RequestHeader> { use bytes::Bytes; diff --git a/core/simulator/src/deps.rs b/core/simulator/src/deps.rs index b7d829e8f..6a798b4aa 100644 --- a/core/simulator/src/deps.rs +++ b/core/simulator/src/deps.rs @@ -155,5 +155,5 @@ pub type SimMetadata = IggyMetadata< SimMuxStateMachine, >; -#[derive(Debug, Default)] -pub struct ReplicaPartitions {} +/// Type alias for simulator partitions +pub type ReplicaPartitions = partitions::IggyPartitions<VsrConsensus<SharedMemBus>>; diff --git a/core/simulator/src/lib.rs b/core/simulator/src/lib.rs index d1f64fc09..cacfe5adf 100644 --- a/core/simulator/src/lib.rs +++ b/core/simulator/src/lib.rs @@ -25,16 +25,23 @@ use iggy_common::header::{GenericHeader, ReplyHeader}; use iggy_common::message::{Message, MessageBag}; use message_bus::MessageBus; use metadata::Metadata; +use partitions::Partitions; use replica::Replica; use std::sync::Arc; -#[derive(Debug)] pub struct Simulator { pub replicas: Vec<Replica>, pub message_bus: Arc<MemBus>, } impl Simulator { + /// Initialize a partition on all replicas (in-memory for simulation) + pub fn init_partition(&mut self, namespace: iggy_common::sharding::IggyNamespace) { + for replica in &mut self.replicas { + replica.partitions.init_partition_in_memory(namespace); + } + } + pub fn new(replica_count: usize, clients: impl Iterator<Item = u128>) -> Self { let mut message_bus = MemBus::new(); for client in clients { @@ -119,7 +126,7 @@ impl Simulator { if operation < 200 { self.dispatch_to_metadata_on_replica(replica, message).await; } else { - self.dispatch_to_partition_on_replica(replica, message); + self.dispatch_to_partition_on_replica(replica, message).await; } } @@ -137,28 +144,16 @@ impl Simulator { } } - fn dispatch_to_partition_on_replica(&self, replica: &Replica, message: MessageBag) { + async fn dispatch_to_partition_on_replica(&self, replica: &Replica, message: MessageBag) { match message { MessageBag::Request(request) => { - todo!( - "dispatch request to partition replica {}: operation={:?}", - replica.id, - request.header().operation - ); + replica.partitions.on_request(request).await; } MessageBag::Prepare(prepare) => { - todo!( - "dispatch prepare to partition replica {}: operation={:?}", - replica.id, - prepare.header().operation - ); + replica.partitions.on_replicate(prepare).await; } MessageBag::PrepareOk(prepare_ok) => { - todo!( - "dispatch prepare_ok to partition replica {}: op={}", - replica.id, - prepare_ok.header().op - ); + replica.partitions.on_ack(prepare_ok).await; } } } diff --git a/core/simulator/src/main.rs b/core/simulator/src/main.rs index 995c717a3..8d519cdda 100644 --- a/core/simulator/src/main.rs +++ b/core/simulator/src/main.rs @@ -17,6 +17,7 @@ use iggy_common::header::ReplyHeader; use iggy_common::message::Message; +use iggy_common::sharding::IggyNamespace; use message_bus::MessageBus; use simulator::{Simulator, client::SimClient}; use std::collections::VecDeque; @@ -41,9 +42,16 @@ impl Responses { fn main() { let client_id: u128 = 1; let leader: u8 = 0; - let sim = Simulator::new(3, std::iter::once(client_id)); + let mut sim = Simulator::new(3, std::iter::once(client_id)); let bus = sim.message_bus.clone(); + // Hardcoded partition for testing: stream_id=1, topic_id=1, partition_id=0 + let test_namespace = IggyNamespace::new(1, 1, 0); + + // Initialize partition on all replicas + println!("[sim] Initializing test partition: {:?}", test_namespace); + sim.init_partition(test_namespace); + // Responses queue let responses = Arc::new(Mutex::new(Responses::default())); let responses_clone = responses.clone(); @@ -54,6 +62,28 @@ fn main() { futures::executor::block_on(async { let client = SimClient::new(client_id); + // Send some test messages to the partition + println!("[client] Sending messages to partition"); + let test_messages = vec![ + b"Hello, partition!".as_slice(), + b"Message 2".as_slice(), + b"Message 3".as_slice(), + ]; + + let send_msg = client.send_messages(test_namespace, test_messages); + bus.send_to_replica(leader, send_msg.into_generic()) + .await + .expect("failed to send messages"); + + loop { + if let Some(reply) = responses_clone.lock().unwrap().pop() { + println!("[client] Got send_messages reply: {:?}", reply.header()); + break; + } + std::thread::sleep(std::time::Duration::from_millis(1)); + } + + // Send metadata operations let create_msg = client.create_stream("test-stream"); bus.send_to_replica(leader, create_msg.into_generic()) .await diff --git a/core/simulator/src/replica.rs b/core/simulator/src/replica.rs index 74c26b72f..a0d0fb54c 100644 --- a/core/simulator/src/replica.rs +++ b/core/simulator/src/replica.rs @@ -20,13 +20,15 @@ use crate::deps::{ MemStorage, ReplicaPartitions, SimJournal, SimMetadata, SimMuxStateMachine, SimSnapshot, }; use consensus::VsrConsensus; +use iggy_common::IggyByteSize; +use iggy_common::sharding::ShardId; use metadata::stm::consumer_group::{ConsumerGroups, ConsumerGroupsInner}; use metadata::stm::stream::{Streams, StreamsInner}; use metadata::stm::user::{Users, UsersInner}; use metadata::{IggyMetadata, variadic}; +use partitions::PartitionsConfig; use std::sync::Arc; -#[derive(Debug)] pub struct Replica { pub id: u8, pub name: String, @@ -43,24 +45,52 @@ impl Replica { let mux = SimMuxStateMachine::new(variadic!(users, streams, consumer_groups)); let cluster_id: u128 = 1; // TODO: Make configurable - let consensus = VsrConsensus::new( + let metadata_consensus = VsrConsensus::new( cluster_id, id, replica_count, SharedMemBus(Arc::clone(&bus)), ); - consensus.init(); + metadata_consensus.init(); + + // Create separate consensus instance for partitions + let partitions_consensus = VsrConsensus::new( + cluster_id, + id, + replica_count, + SharedMemBus(Arc::clone(&bus)), + ); + partitions_consensus.init(); + + // Configure partitions + let partitions_config = PartitionsConfig { + messages_required_to_save: 1000, + size_of_messages_required_to_save: IggyByteSize::from(4 * 1024 * 1024), + enforce_fsync: false, // Disable fsync for simulation + segment_size: IggyByteSize::from(1024 * 1024 * 1024), // 1GB segments + }; + + // Only replica 0 gets consensus (primary shard for now) + let partitions = if id == 0 { + ReplicaPartitions::new( + ShardId::new(id as u16), + partitions_config, + Some(partitions_consensus), + ) + } else { + ReplicaPartitions::new(ShardId::new(id as u16), partitions_config, None) + }; Self { id, name, metadata: IggyMetadata { - consensus: Some(consensus), + consensus: Some(metadata_consensus), journal: Some(SimJournal::<MemStorage>::default()), snapshot: Some(SimSnapshot::default()), mux_stm: mux, }, - partitions: ReplicaPartitions::default(), + partitions, bus, } } diff --git a/rust_out b/rust_out new file mode 100755 index 000000000..421fd8e25 Binary files /dev/null and b/rust_out differ
