hubcio commented on code in PR #2735:
URL: https://github.com/apache/iggy/pull/2735#discussion_r2812069101


##########
core/partitions/src/iggy_partitions.rs:
##########
@@ -116,4 +180,873 @@ impl IggyPartitions {
 
         Some(partition)
     }
+
+    /// Remove multiple partitions at once.
+    pub fn remove_many(&mut self, namespaces: &[IggyNamespace]) -> 
Vec<IggyPartition> {
+        namespaces.iter().filter_map(|ns| self.remove(ns)).collect()
+    }
+
+    /// Iterate over all namespaces owned by this shard.
+    pub fn namespaces(&self) -> impl Iterator<Item = &IggyNamespace> {
+        self.namespace_to_local.keys()
+    }
+
+    /// Iterate over all (namespace, partition) pairs.
+    pub fn iter(&self) -> impl Iterator<Item = (&IggyNamespace, 
&IggyPartition)> {
+        self.namespace_to_local
+            .iter()
+            .filter_map(|(ns, idx)| self.partitions().get(**idx).map(|p| (ns, 
p)))
+    }
+
+    /// Iterate over all (namespace, partition) pairs mutably.
+    pub fn iter_mut(&self) -> impl Iterator<Item = (&IggyNamespace, &mut 
IggyPartition)> {
+        let partitions = self.partitions_mut();
+        let partitions_ptr = partitions.as_mut_ptr();
+        let partitions_len = partitions.len();
+        self.namespace_to_local.iter().filter_map(move |(ns, idx)| {
+            let i = **idx;
+            if i < partitions_len {
+                // Safety: each LocalIdx is unique, so no two iterations alias 
the same element.
+                Some((ns, unsafe { &mut *partitions_ptr.add(i) }))
+            } else {
+                None
+            }
+        })
+    }
+
+    /// Get partition by namespace, initializing if not present.
+    pub fn get_or_init<F>(&mut self, namespace: IggyNamespace, init: F) -> 
&mut IggyPartition
+    where
+        F: FnOnce() -> IggyPartition,
+    {
+        // TODO: get_or_insert
+        if !self.namespace_to_local.contains_key(&namespace) {
+            let partition = init();
+            self.insert(namespace, partition);
+        }
+        let idx = *self.namespace_to_local[&namespace];
+        &mut self.partitions_mut()[idx]
+    }
+
+    /// Initialize a new partition with in-memory storage (for 
testing/simulation).
+    ///
+    /// This is a simplified version that doesn't create file-backed storage.
+    /// Use `init_partition()` for production use with real files.
+    ///
+    /// TODO: Make the log generic over its storage backend to support both
+    /// in-memory (for testing) and file-backed (for production) storage 
without
+    /// needing separate initialization methods.
+    pub fn init_partition_in_memory(&mut self, namespace: IggyNamespace) -> 
LocalIdx {
+        // Check if already initialized
+        if let Some(idx) = self.local_idx(&namespace) {
+            return idx;
+        }
+
+        // Create initial segment with default (in-memory) storage
+        let start_offset = 0;
+        let segment = Segment::new(start_offset, self.config.segment_size);
+        let storage = SegmentStorage::default();
+
+        // Create partition with initialized log
+        let stats = Arc::new(PartitionStats::default());
+        let mut partition = IggyPartition::new(stats.clone());
+        partition.log.add_persisted_segment(segment, storage);
+        partition.offset.store(start_offset, Ordering::Relaxed);
+        partition
+            .dirty_offset
+            .store(start_offset, Ordering::Relaxed);
+        partition.should_increment_offset = false;
+        partition.stats.increment_segments_count(1);
+
+        // Insert and return local index
+        self.insert(namespace, partition)
+    }
+
+    /// Initialize a new partition with file-backed storage.
+    ///
+    /// This is the data plane initialization - creates the partition 
structure,
+    /// initial segment, and storage. Skips the control plane metadata 
broadcasting.
+    ///
+    /// Corresponds to the "INITIATE PARTITION" phase in the server's flow:
+    /// 1. Control plane: create PartitionMeta (SKIPPED in this method)
+    /// 2. Control plane: broadcast to shards (SKIPPED in this method)
+    /// 3. Data plane: INITIATE PARTITION (THIS METHOD)
+    ///
+    /// Idempotent - returns existing LocalIdx if partition already exists.
+    pub async fn init_partition(&mut self, namespace: IggyNamespace) -> 
LocalIdx {
+        // Check if already initialized
+        if let Some(idx) = self.local_idx(&namespace) {
+            return idx;
+        }
+
+        // Create initial segment with storage
+        let start_offset = 0;
+        let segment = Segment::new(start_offset, self.config.segment_size);
+
+        // TODO: Waiting for issue to move server config to shared module.
+        // Once complete, paths will come from proper 
base_path/streams_path/etc config fields.
+        let messages_path = self.config.get_messages_path(
+            namespace.stream_id(),
+            namespace.topic_id(),
+            namespace.partition_id(),
+            start_offset,
+        );
+        let index_path = self.config.get_index_path(
+            namespace.stream_id(),
+            namespace.topic_id(),
+            namespace.partition_id(),
+            start_offset,
+        );
+
+        let storage = SegmentStorage::new(
+            &messages_path,
+            &index_path,
+            0, // messages_size (new segment)
+            0, // indexes_size (new segment)
+            self.config.enforce_fsync,
+            self.config.enforce_fsync,
+            false, // file_exists (new segment)
+        )
+        .await
+        .expect("Failed to create segment storage");
+
+        // Create partition with initialized log
+        let stats = Arc::new(PartitionStats::default());
+        let mut partition = IggyPartition::new(stats.clone());
+        partition.log.add_persisted_segment(segment, storage);
+        partition.offset.store(start_offset, Ordering::Relaxed);
+        partition
+            .dirty_offset
+            .store(start_offset, Ordering::Relaxed);
+        partition.should_increment_offset = false;
+        partition.stats.increment_segments_count(1);
+
+        // Insert and return local index
+        self.insert(namespace, partition)
+    }
+}
+
+impl<B> Partitions<VsrConsensus<B>> for IggyPartitions<VsrConsensus<B>>
+where
+    B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>,
+{
+    async fn on_request(&self, message: <VsrConsensus<B> as 
Consensus>::RequestMessage) {
+        let consensus = self.consensus.as_ref().unwrap();
+
+        debug!("handling partition request");
+        let prepare = message.project(consensus);
+        self.pipeline_prepare(prepare).await;
+    }
+
+    async fn on_replicate(&self, message: <VsrConsensus<B> as 
Consensus>::ReplicateMessage) {
+        let consensus = self.consensus.as_ref().unwrap();
+
+        let header = message.header();
+
+        assert_eq!(header.command, Command2::Prepare);
+
+        if !self.fence_old_prepare(&message) {
+            self.replicate(message.clone()).await;
+        } else {
+            warn!("received old prepare, not replicating");
+        }
+
+        // If syncing, ignore the replicate message.
+        if consensus.is_syncing() {
+            warn!(
+                replica = consensus.replica(),
+                "on_replicate: ignoring (sync)"
+            );
+            return;
+        }
+
+        let current_op = consensus.sequencer().current_sequence();
+
+        // If status is not normal, ignore the replicate.
+        if consensus.status() != Status::Normal {
+            warn!(
+                replica = consensus.replica(),
+                "on_replicate: ignoring (not normal state)"
+            );
+            return;
+        }
+
+        // If message from future view, we ignore the replicate.
+        if header.view > consensus.view() {
+            warn!(
+                replica = consensus.replica(),
+                "on_replicate: ignoring (newer view)"
+            );
+            return;
+        }
+
+        // If we are a follower, we advance the commit number.
+        if consensus.is_follower() {
+            consensus.advance_commit_number(message.header().commit);
+        }
+
+        //
+        assert_eq!(header.op, current_op + 1);

Review Comment:
   i think this should be debug_assert, not actual assert in production



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to