This is an automated email from the ASF dual-hosted git repository.
gkoszyk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new 153cb378c feat(metadata): impl Snapshot interface for Mux state
machine (#2675)
153cb378c is described below
commit 153cb378c49eaca0af03a1040fe2b921dfc54427
Author: Krishna Vishal <[email protected]>
AuthorDate: Fri Feb 13 16:52:20 2026 +0530
feat(metadata): impl Snapshot interface for Mux state machine (#2675)
The snapshot system has three layers:
`Snapshotable` per-state-machine trait. Each `StreamsInner`, `
UsersInner`, `ConsumerGroupsInner` implements `to_snapshot()` /
from_snapshot() to convert between in-memory state.
`SnapshotContributor` visitor trait using the same recursive variadic
tuple pattern as `StateMachine::update`. Walks (`Users`, (`Streams`,
(`ConsumerGroups`, ()))) at compile time, collecting each state
machine's serialized section into a `Vec<SnapshotSection>`.
`MetadataSnapshot` a top-level interface for the generic snapshot type
on `IggyMetadata`. Defines the full lifecycle: create (snapshot from
mux) → encode (to bytes) → decode (from bytes) → restore (back to mux).
---------
Co-authored-by: Grzegorz Koszyk
<[email protected]>
---
Cargo.lock | 2 +
core/metadata/Cargo.toml | 2 +
core/metadata/src/impls/metadata.rs | 55 +++++
core/metadata/src/stats/mod.rs | 28 +++
core/metadata/src/stm/consumer_group.rs | 146 +++++++++++-
core/metadata/src/stm/mod.rs | 14 ++
core/metadata/src/stm/mux.rs | 126 ++++++++++
core/metadata/src/stm/snapshot.rs | 402 ++++++++++++++++++++++++++++++++
core/metadata/src/stm/stream.rs | 190 ++++++++++++++-
core/metadata/src/stm/user.rs | 225 +++++++++++++++++-
10 files changed, 1184 insertions(+), 6 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
index f16507888..d7ffd97b4 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -5712,6 +5712,8 @@ dependencies = [
"left-right",
"message_bus",
"paste",
+ "rmp-serde",
+ "serde",
"slab",
"tracing",
]
diff --git a/core/metadata/Cargo.toml b/core/metadata/Cargo.toml
index 193bf5788..f5d81d5f8 100644
--- a/core/metadata/Cargo.toml
+++ b/core/metadata/Cargo.toml
@@ -35,5 +35,7 @@ journal = { workspace = true }
left-right = { workspace = true }
message_bus = { workspace = true }
paste = { workspace = true }
+rmp-serde = { workspace = true }
+serde = { workspace = true, features = ["derive"] }
slab = { workspace = true }
tracing = { workspace = true }
diff --git a/core/metadata/src/impls/metadata.rs
b/core/metadata/src/impls/metadata.rs
index c4e4b3da3..b3d9769b5 100644
--- a/core/metadata/src/impls/metadata.rs
+++ b/core/metadata/src/impls/metadata.rs
@@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.
use crate::stm::StateMachine;
+use crate::stm::snapshot::{FillSnapshot, MetadataSnapshot, Snapshot,
SnapshotError};
use consensus::{Consensus, Project, Sequencer, Status, VsrConsensus};
use iggy_common::{
header::{Command2, GenericHeader, PrepareHeader, PrepareOkHeader,
ReplyHeader},
@@ -24,6 +25,60 @@ use journal::{Journal, JournalHandle};
use message_bus::MessageBus;
use tracing::{debug, warn};
+#[derive(Debug, Clone)]
+#[allow(unused)]
+pub struct IggySnapshot {
+ snapshot: MetadataSnapshot,
+}
+
+#[allow(unused)]
+impl IggySnapshot {
+ pub fn new(sequence_number: u64) -> Self {
+ Self {
+ snapshot: MetadataSnapshot::new(sequence_number),
+ }
+ }
+
+ pub fn snapshot(&self) -> &MetadataSnapshot {
+ &self.snapshot
+ }
+}
+
+impl Snapshot for IggySnapshot {
+ type Error = SnapshotError;
+ type SequenceNumber = u64;
+ type Timestamp = u64;
+ type Inner = MetadataSnapshot;
+
+ fn create<T>(stm: &T, sequence_number: u64) -> Result<Self, SnapshotError>
+ where
+ T: FillSnapshot<MetadataSnapshot>,
+ {
+ let mut snapshot = MetadataSnapshot::new(sequence_number);
+
+ stm.fill_snapshot(&mut snapshot)?;
+
+ Ok(Self { snapshot })
+ }
+
+ fn encode(&self) -> Result<Vec<u8>, SnapshotError> {
+ self.snapshot.encode()
+ }
+
+ fn decode(bytes: &[u8]) -> Result<Self, SnapshotError> {
+ let snapshot = MetadataSnapshot::decode(bytes)?;
+ Ok(Self { snapshot })
+ }
+
+ fn sequence_number(&self) -> u64 {
+ self.snapshot.sequence_number
+ }
+
+ fn created_at(&self) -> u64 {
+ self.snapshot.created_at
+ }
+}
+
pub trait Metadata<C>
where
C: Consensus,
diff --git a/core/metadata/src/stats/mod.rs b/core/metadata/src/stats/mod.rs
index 35b926176..f4cc3aedc 100644
--- a/core/metadata/src/stats/mod.rs
+++ b/core/metadata/src/stats/mod.rs
@@ -89,6 +89,20 @@ impl StreamStats {
self.zero_out_messages_count();
self.zero_out_segments_count();
}
+
+ pub fn load_for_snapshot(&self) -> (u64, u64, u32) {
+ (
+ self.size_bytes.load(Ordering::Relaxed),
+ self.messages_count.load(Ordering::Relaxed),
+ self.segments_count.load(Ordering::Relaxed),
+ )
+ }
+
+ pub fn store_from_snapshot(&self, size_bytes: u64, messages_count: u64,
segments_count: u32) {
+ self.size_bytes.store(size_bytes, Ordering::Relaxed);
+ self.messages_count.store(messages_count, Ordering::Relaxed);
+ self.segments_count.store(segments_count, Ordering::Relaxed);
+ }
}
#[derive(Default, Debug)]
@@ -219,6 +233,20 @@ impl TopicStats {
self.zero_out_messages_count();
self.zero_out_segments_count();
}
+
+ pub fn load_for_snapshot(&self) -> (u64, u64, u32) {
+ (
+ self.size_bytes.load(Ordering::Relaxed),
+ self.messages_count.load(Ordering::Relaxed),
+ self.segments_count.load(Ordering::Relaxed),
+ )
+ }
+
+ pub fn store_from_snapshot(&self, size_bytes: u64, messages_count: u64,
segments_count: u32) {
+ self.size_bytes.store(size_bytes, Ordering::Relaxed);
+ self.messages_count.store(messages_count, Ordering::Relaxed);
+ self.segments_count.store(segments_count, Ordering::Relaxed);
+ }
}
#[derive(Default, Debug)]
diff --git a/core/metadata/src/stm/consumer_group.rs
b/core/metadata/src/stm/consumer_group.rs
index 71babb412..5c039e0dc 100644
--- a/core/metadata/src/stm/consumer_group.rs
+++ b/core/metadata/src/stm/consumer_group.rs
@@ -16,14 +16,17 @@
// under the License.
use crate::stm::StateHandler;
-use crate::{collect_handlers, define_state};
+use crate::stm::snapshot::Snapshotable;
+use crate::{collect_handlers, define_state, impl_fill_restore};
+
use ahash::AHashMap;
use iggy_common::create_consumer_group::CreateConsumerGroup;
use iggy_common::delete_consumer_group::DeleteConsumerGroup;
use iggy_common::{IdKind, Identifier};
+use serde::{Deserialize, Serialize};
use slab::Slab;
use std::sync::Arc;
-use std::sync::atomic::AtomicUsize;
+use std::sync::atomic::{AtomicUsize, Ordering};
#[derive(Debug, Clone)]
pub struct ConsumerGroupMember {
@@ -231,3 +234,142 @@ impl StateHandler for DeleteConsumerGroup {
}
}
}
+
+/// Consumer group member snapshot representation for serialization.
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct ConsumerGroupMemberSnapshot {
+ pub id: usize,
+ pub client_id: u32,
+ pub partitions: Vec<usize>,
+ pub partition_index: usize,
+}
+
+/// Consumer group snapshot representation for serialization.
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct ConsumerGroupSnapshot {
+ pub id: usize,
+ pub name: String,
+ pub partitions: Vec<usize>,
+ pub members: Vec<(usize, ConsumerGroupMemberSnapshot)>,
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct ConsumerGroupsSnapshot {
+ pub items: Vec<(usize, ConsumerGroupSnapshot)>,
+ pub topic_index: Vec<((usize, usize), Vec<usize>)>,
+ pub topic_name_index: Vec<((String, String), Vec<usize>)>,
+}
+
+impl Snapshotable for ConsumerGroups {
+ type Snapshot = ConsumerGroupsSnapshot;
+
+ fn to_snapshot(&self) -> Self::Snapshot {
+ self.inner.read(|inner| {
+ let items: Vec<(usize, ConsumerGroupSnapshot)> = inner
+ .items
+ .iter()
+ .map(|(group_id, group)| {
+ let members: Vec<(usize, ConsumerGroupMemberSnapshot)> =
group
+ .members
+ .iter()
+ .map(|(member_id, member)| {
+ (
+ member_id,
+ ConsumerGroupMemberSnapshot {
+ id: member.id,
+ client_id: member.client_id,
+ partitions: member.partitions.clone(),
+ partition_index:
member.partition_index.load(Ordering::Relaxed),
+ },
+ )
+ })
+ .collect();
+
+ (
+ group_id,
+ ConsumerGroupSnapshot {
+ id: group.id,
+ name: group.name.to_string(),
+ partitions: group.partitions.clone(),
+ members,
+ },
+ )
+ })
+ .collect();
+
+ let topic_index: Vec<((usize, usize), Vec<usize>)> = inner
+ .topic_index
+ .iter()
+ .map(|(&k, v)| (k, v.clone()))
+ .collect();
+
+ let topic_name_index: Vec<((String, String), Vec<usize>)> = inner
+ .topic_name_index
+ .iter()
+ .map(|((s, t), v)| ((s.to_string(), t.to_string()), v.clone()))
+ .collect();
+
+ ConsumerGroupsSnapshot {
+ items,
+ topic_index,
+ topic_name_index,
+ }
+ })
+ }
+
+ fn from_snapshot(
+ snapshot: Self::Snapshot,
+ ) -> Result<Self, crate::stm::snapshot::SnapshotError> {
+ let mut name_index: AHashMap<Arc<str>, usize> = AHashMap::new();
+ let mut group_entries: Vec<(usize, ConsumerGroup)> = Vec::new();
+
+ for (slab_key, group_snap) in snapshot.items {
+ let member_entries: Vec<(usize, ConsumerGroupMember)> = group_snap
+ .members
+ .into_iter()
+ .map(|(member_key, member_snap)| {
+ let member = ConsumerGroupMember {
+ id: member_snap.id,
+ client_id: member_snap.client_id,
+ partitions: member_snap.partitions,
+ partition_index:
Arc::new(AtomicUsize::new(member_snap.partition_index)),
+ };
+ (member_key, member)
+ })
+ .collect();
+ let members: Slab<ConsumerGroupMember> =
member_entries.into_iter().collect();
+
+ let group_name: Arc<str> = Arc::from(group_snap.name.as_str());
+ let group = ConsumerGroup {
+ id: group_snap.id,
+ name: group_name.clone(),
+ partitions: group_snap.partitions,
+ members,
+ };
+
+ name_index.insert(group_name, slab_key);
+ group_entries.push((slab_key, group));
+ }
+
+ let items = group_entries.into_iter().collect();
+
+ let topic_index: AHashMap<(usize, usize), Vec<usize>> =
+ snapshot.topic_index.into_iter().collect();
+
+ let topic_name_index: AHashMap<(Arc<str>, Arc<str>), Vec<usize>> =
snapshot
+ .topic_name_index
+ .into_iter()
+ .map(|((s, t), v)| ((Arc::from(s.as_str()),
Arc::from(t.as_str())), v))
+ .collect();
+
+ let inner = ConsumerGroupsInner {
+ name_index,
+ topic_index,
+ topic_name_index,
+ items,
+ };
+ Ok(inner.into())
+ }
+}
+
+impl_fill_restore!(ConsumerGroups, consumer_groups);
diff --git a/core/metadata/src/stm/mod.rs b/core/metadata/src/stm/mod.rs
index 08bef35dd..d5f3d573e 100644
--- a/core/metadata/src/stm/mod.rs
+++ b/core/metadata/src/stm/mod.rs
@@ -17,6 +17,7 @@
pub mod consumer_group;
pub mod mux;
+pub mod snapshot;
pub mod stream;
pub mod user;
@@ -87,6 +88,19 @@ where
read: Arc<ReadHandle<T>>,
}
+impl<T, C> LeftRight<T, C>
+where
+ T: Absorb<C>,
+{
+ pub fn read<F, R>(&self, f: F) -> R
+ where
+ F: FnOnce(&T) -> R,
+ {
+ let guard = self.read.enter().expect("read handle should be
accessible");
+ f(&*guard)
+ }
+}
+
impl<T> From<T> for LeftRight<T, <T as Command>::Cmd>
where
T: Absorb<<T as Command>::Cmd> + Clone + Command,
diff --git a/core/metadata/src/stm/mux.rs b/core/metadata/src/stm/mux.rs
index f68d99bf7..ae514e5c4 100644
--- a/core/metadata/src/stm/mux.rs
+++ b/core/metadata/src/stm/mux.rs
@@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.
+use crate::stm::snapshot::{FillSnapshot, RestoreSnapshot, SnapshotError};
use iggy_common::{header::PrepareHeader, message::Message};
use crate::stm::{State, StateMachine};
@@ -91,7 +92,58 @@ where
}
}
+/// Recursive case for variadic tuple pattern: (Head, Tail)
+/// Fills snapshot from head and tail, and restores both on restore.
+impl<SnapshotData, Head, Tail> FillSnapshot<SnapshotData> for variadic!(Head,
...Tail)
+where
+ Head: FillSnapshot<SnapshotData>,
+ Tail: FillSnapshot<SnapshotData>,
+{
+ fn fill_snapshot(&self, snapshot: &mut SnapshotData) -> Result<(),
SnapshotError> {
+ self.0.fill_snapshot(snapshot)?;
+ self.1.fill_snapshot(snapshot)?;
+ Ok(())
+ }
+}
+
+impl<SnapshotData, Head, Tail> RestoreSnapshot<SnapshotData> for
variadic!(Head, ...Tail)
+where
+ Head: RestoreSnapshot<SnapshotData>,
+ Tail: RestoreSnapshot<SnapshotData>,
+{
+ fn restore_snapshot(snapshot: &SnapshotData) -> Result<Self,
SnapshotError> {
+ let head = Head::restore_snapshot(snapshot)?;
+ let tail = Tail::restore_snapshot(snapshot)?;
+ Ok((head, tail))
+ }
+}
+
+impl<SnapshotData, T> FillSnapshot<SnapshotData> for MuxStateMachine<T>
+where
+ T: StateMachine + FillSnapshot<SnapshotData>,
+{
+ fn fill_snapshot(&self, snapshot: &mut SnapshotData) -> Result<(),
SnapshotError> {
+ self.inner.fill_snapshot(snapshot)
+ }
+}
+
+impl<SnapshotData, T> RestoreSnapshot<SnapshotData> for MuxStateMachine<T>
+where
+ T: StateMachine + RestoreSnapshot<SnapshotData>,
+{
+ fn restore_snapshot(snapshot: &SnapshotData) -> Result<Self,
SnapshotError> {
+ let inner = T::restore_snapshot(snapshot)?;
+ Ok(MuxStateMachine::new(inner))
+ }
+}
+
+#[allow(unused_imports)]
mod tests {
+ use super::*;
+ use crate::stm::consumer_group::{ConsumerGroups, ConsumerGroupsInner};
+ use crate::stm::snapshot::{FillSnapshot, MetadataSnapshot,
RestoreSnapshot, Snapshotable};
+ use crate::stm::stream::{Streams, StreamsInner};
+ use crate::stm::user::{Users, UsersInner};
#[test]
fn construct_mux_state_machine_from_states_with_same_output() {
@@ -110,4 +162,78 @@ mod tests {
mux.update(input);
}
+
+ #[test]
+ fn mux_state_machine_snapshot_roundtrip() {
+ let users: Users = UsersInner::new().into();
+ let streams: Streams = StreamsInner::new().into();
+ let consumer_groups: ConsumerGroups =
ConsumerGroupsInner::new().into();
+
+ let mux = MuxStateMachine::new(variadic!(users, streams,
consumer_groups));
+
+ // Fill the typed snapshot
+ let mut snapshot = MetadataSnapshot::new(12345);
+ mux.fill_snapshot(&mut snapshot).unwrap();
+
+ // Verify all fields are filled
+ assert!(snapshot.users.is_some());
+ assert!(snapshot.streams.is_some());
+ assert!(snapshot.consumer_groups.is_some());
+
+ // Restore and verify
+ type MuxTuple = (Users, (Streams, (ConsumerGroups, ())));
+ let restored: MuxStateMachine<MuxTuple> =
+ MuxStateMachine::restore_snapshot(&snapshot).unwrap();
+
+ // Verify the restored mux produces the same snapshot
+ let mut verify_snapshot = MetadataSnapshot::new(0);
+ restored.fill_snapshot(&mut verify_snapshot).unwrap();
+ assert!(verify_snapshot.users.is_some());
+ assert!(verify_snapshot.streams.is_some());
+ assert!(verify_snapshot.consumer_groups.is_some());
+ }
+
+ #[test]
+ fn mux_state_machine_full_envelope_roundtrip() {
+ use crate::impls::metadata::IggySnapshot;
+ use crate::stm::snapshot::Snapshot;
+
+ let users: Users = UsersInner::new().into();
+ let streams: Streams = StreamsInner::new().into();
+ let consumer_groups: ConsumerGroups =
ConsumerGroupsInner::new().into();
+
+ type MuxTuple = (Users, (Streams, (ConsumerGroups, ())));
+ let mux: MuxStateMachine<MuxTuple> =
+ MuxStateMachine::new(variadic!(users, streams, consumer_groups));
+
+ let sequence_number = 12345u64;
+ let snapshot = IggySnapshot::create(&mux, sequence_number).unwrap();
+
+ assert_eq!(snapshot.sequence_number(), sequence_number);
+ assert!(snapshot.created_at() > 0);
+
+ // Encode to bytes
+ let encoded = snapshot.encode().unwrap();
+ assert!(!encoded.is_empty());
+
+ // Decode from bytes
+ let decoded = IggySnapshot::decode(&encoded).unwrap();
+ assert_eq!(decoded.sequence_number(), sequence_number);
+
+ // Verify snapshot fields are present
+ assert!(decoded.snapshot().users.is_some());
+ assert!(decoded.snapshot().streams.is_some());
+ assert!(decoded.snapshot().consumer_groups.is_some());
+
+ // Restore MuxStateMachine from the state side (symmetric with
fill_snapshot)
+ let restored: MuxStateMachine<MuxTuple> =
+ MuxStateMachine::restore_snapshot(decoded.snapshot()).unwrap();
+
+ // Verify restored state
+ let mut verify_snapshot = MetadataSnapshot::new(0);
+ restored.fill_snapshot(&mut verify_snapshot).unwrap();
+ assert!(verify_snapshot.users.is_some());
+ assert!(verify_snapshot.streams.is_some());
+ assert!(verify_snapshot.consumer_groups.is_some());
+ }
}
diff --git a/core/metadata/src/stm/snapshot.rs
b/core/metadata/src/stm/snapshot.rs
new file mode 100644
index 000000000..76210ceb6
--- /dev/null
+++ b/core/metadata/src/stm/snapshot.rs
@@ -0,0 +1,402 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use serde::{Deserialize, Serialize, de::DeserializeOwned};
+use std::fmt;
+
+use crate::stm::consumer_group::ConsumerGroupsSnapshot;
+use crate::stm::stream::StreamsSnapshot;
+use crate::stm::user::UsersSnapshot;
+
+#[derive(Debug)]
+pub enum SnapshotError {
+ /// Serialization failed.
+ Serialize(rmp_serde::encode::Error),
+ /// Deserialization failed.
+ Deserialize(rmp_serde::decode::Error),
+}
+
+impl fmt::Display for SnapshotError {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ match self {
+ SnapshotError::Serialize(e) => write!(f, "snapshot serialization
failed: {}", e),
+ SnapshotError::Deserialize(e) => write!(f, "snapshot
deserialization failed: {}", e),
+ }
+ }
+}
+
+impl std::error::Error for SnapshotError {
+ fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
+ match self {
+ SnapshotError::Serialize(e) => Some(e),
+ SnapshotError::Deserialize(e) => Some(e),
+ }
+ }
+}
+
+/// The snapshot container for all metadata state machines.
+/// Each field corresponds to one state machine's serialized state.
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct MetadataSnapshot {
+ /// Snapshot format version for forward/backward compatibility.
+ /// TODO(krishvishal): Properly handle versioning for snapshot. This is a
placeholder for now.
+ pub version: u32,
+ /// Timestamp when the snapshot was created (microseconds since epoch).
+ pub created_at: u64,
+ /// Monotonically increasing snapshot sequence number.
+ pub sequence_number: u64,
+ /// Users state machine snapshot data.
+ pub users: Option<UsersSnapshot>,
+ /// Streams state machine snapshot data.
+ pub streams: Option<StreamsSnapshot>,
+ /// Consumer groups state machine snapshot data.
+ pub consumer_groups: Option<ConsumerGroupsSnapshot>,
+}
+
+impl Default for MetadataSnapshot {
+ fn default() -> Self {
+ Self::new(0)
+ }
+}
+
+impl MetadataSnapshot {
+ /// Create a new snapshot with the given sequence number.
+ pub fn new(sequence_number: u64) -> Self {
+ Self {
+ version: 1,
+ created_at: iggy_common::IggyTimestamp::now().as_micros(),
+ sequence_number,
+ users: None,
+ streams: None,
+ consumer_groups: None,
+ }
+ }
+
+ /// Encode the snapshot to msgpack bytes.
+ pub fn encode(&self) -> Result<Vec<u8>, SnapshotError> {
+ rmp_serde::to_vec(self).map_err(SnapshotError::Serialize)
+ }
+
+ /// Decode a snapshot from msgpack bytes.
+ pub fn decode(bytes: &[u8]) -> Result<Self, SnapshotError> {
+ rmp_serde::from_slice(bytes).map_err(SnapshotError::Deserialize)
+ }
+}
+
+/// Trait for metadata snapshot implementations.
+///
+/// This is the high-level interface that concrete snapshot types (e.g.
`IggySnapshot`)
+/// must satisfy. It provides methods for creating, encoding, and decoding
snapshots.
+pub trait Snapshot: Sized {
+ /// The error type for snapshot operations.
+ type Error: std::error::Error;
+
+ /// The type used for snapshot sequence numbers.
+ type SequenceNumber;
+
+ /// The type used for snapshot timestamps.
+ type Timestamp;
+
+ /// The inner snapshot data structure that state machines fill and restore
from.
+ type Inner;
+
+ /// Create a snapshot from the current state of a state machine.
+ ///
+ /// # Arguments
+ /// * `stm` - The state machine to snapshot
+ /// * `sequence_number` - Monotonically increasing snapshot sequence number
+ fn create<T>(stm: &T, sequence_number: Self::SequenceNumber) ->
Result<Self, Self::Error>
+ where
+ T: FillSnapshot<Self::Inner>;
+
+ /// Encode the snapshot to msgpack bytes.
+ fn encode(&self) -> Result<Vec<u8>, Self::Error>;
+
+ /// Decode a snapshot from msgpack bytes.
+ fn decode(bytes: &[u8]) -> Result<Self, Self::Error>;
+
+ /// Get the snapshot sequence number.
+ fn sequence_number(&self) -> Self::SequenceNumber;
+
+ /// Get the timestamp when this snapshot was created.
+ fn created_at(&self) -> Self::Timestamp;
+}
+
+/// Trait implemented by each `{Name}Inner` state machine to support
snapshotting.
+/// Each state machine defines its own snapshot
+/// type for serialization and provides conversion methods.
+pub trait Snapshotable {
+ /// The serde-serializable snapshot representation of this state.
+ /// This should be a plain struct with only serializable types and no
wrappers
+ /// like `Arc`, `AtomicUsize`, or other non-serializable wrappers.
+ type Snapshot: Serialize + DeserializeOwned;
+
+ /// Convert the current in-memory state into a serializable snapshot.
+ fn to_snapshot(&self) -> Self::Snapshot;
+
+ /// Restore in-memory state from a snapshot representation.
+ fn from_snapshot(snapshot: Self::Snapshot) -> Result<Self, SnapshotError>
+ where
+ Self: Sized;
+}
+
+/// Trait for filling a typed snapshot with state machine data.
+///
+/// Each state machine implements this to write its serialized state
+pub trait FillSnapshot<S> {
+ /// Fill the snapshot with this state machine's data.
+ fn fill_snapshot(&self, snapshot: &mut S) -> Result<(), SnapshotError>;
+}
+
+/// Trait for restoring state machine data from a typed snapshot.
+///
+/// Each state machine implements this to read its state.
+pub trait RestoreSnapshot<S>: Sized {
+ /// Restore this state machine from the snapshot.
+ fn restore_snapshot(snapshot: &S) -> Result<Self, SnapshotError>;
+}
+
+/// Base case for the recursive tuple pattern - unit type terminates the
recursion.
+impl<S> FillSnapshot<S> for () {
+ fn fill_snapshot(&self, _snapshot: &mut S) -> Result<(), SnapshotError> {
+ Ok(())
+ }
+}
+
+impl<S> RestoreSnapshot<S> for () {
+ fn restore_snapshot(_snapshot: &S) -> Result<Self, SnapshotError> {
+ Ok(())
+ }
+}
+
+/// Generates `FillSnapshot` and `RestoreSnapshot` implementations for a
wrapper type.
+///
+/// The wrapper type (e.g. `Streams`) must implement `Snapshotable`.
+///
+/// # Example
+///
+/// ```ignore
+/// impl_fill_restore!(Users, users);
+/// ```
+#[macro_export]
+macro_rules! impl_fill_restore {
+ ($wrapper:ident, $field:ident) => {
+ impl
$crate::stm::snapshot::FillSnapshot<$crate::stm::snapshot::MetadataSnapshot>
+ for $wrapper
+ {
+ fn fill_snapshot(
+ &self,
+ snapshot: &mut $crate::stm::snapshot::MetadataSnapshot,
+ ) -> Result<(), $crate::stm::snapshot::SnapshotError> {
+ use $crate::stm::snapshot::Snapshotable;
+ snapshot.$field = Some(self.to_snapshot());
+ Ok(())
+ }
+ }
+
+ impl
$crate::stm::snapshot::RestoreSnapshot<$crate::stm::snapshot::MetadataSnapshot>
+ for $wrapper
+ {
+ fn restore_snapshot(
+ snapshot: &$crate::stm::snapshot::MetadataSnapshot,
+ ) -> Result<Self, $crate::stm::snapshot::SnapshotError> {
+ use serde::de::Error as _;
+ use $crate::stm::snapshot::{SnapshotError, Snapshotable};
+ let snap = snapshot.$field.clone().ok_or_else(|| {
+
SnapshotError::Deserialize(rmp_serde::decode::Error::custom(format_args!(
+ "Snapshot Restore Error: {}",
+ stringify!($field)
+ )))
+ })?;
+ Self::from_snapshot(snap)
+ }
+ }
+ };
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::stm::stream::{StatsSnapshot, StreamSnapshot};
+ use iggy_common::IggyTimestamp;
+
+ #[test]
+ fn test_metadata_snapshot_roundtrip() {
+ let snapshot = MetadataSnapshot::new(42);
+
+ let encoded = snapshot.encode().unwrap();
+ let decoded = MetadataSnapshot::decode(&encoded).unwrap();
+
+ assert_eq!(decoded.sequence_number, 42);
+ assert!(decoded.users.is_none());
+ assert!(decoded.streams.is_none());
+ assert!(decoded.consumer_groups.is_none());
+ }
+
+ #[test]
+ fn roundtrip_with_data() {
+ let ts = IggyTimestamp::from(1694968446131680u64);
+
+ let mut snapshot = MetadataSnapshot::new(100);
+ snapshot.streams = Some(StreamsSnapshot {
+ items: vec![(
+ 0,
+ StreamSnapshot {
+ id: 0,
+ name: "events".to_string(),
+ created_at: ts,
+ stats: StatsSnapshot {
+ size_bytes: 1024,
+ messages_count: 50,
+ segments_count: 2,
+ },
+ topics: vec![],
+ },
+ )],
+ });
+
+ let encoded = snapshot.encode().unwrap();
+ let decoded = MetadataSnapshot::decode(&encoded).unwrap();
+
+ assert_eq!(decoded.sequence_number, 100);
+ assert!(decoded.users.is_none());
+ assert!(decoded.consumer_groups.is_none());
+
+ let streams = decoded.streams.as_ref().unwrap();
+ assert_eq!(streams.items.len(), 1);
+
+ let (slab_id, stream) = &streams.items[0];
+ assert_eq!(*slab_id, 0);
+ assert_eq!(stream.name, "events");
+ assert_eq!(stream.created_at.as_micros(), ts.as_micros());
+ assert_eq!(stream.stats.size_bytes, 1024);
+ assert_eq!(stream.stats.messages_count, 50);
+ assert_eq!(stream.stats.segments_count, 2);
+ assert_eq!(stream.topics.len(), 0);
+ }
+
+ #[test]
+ fn roundtrip_with_slab_gaps() {
+ use crate::stm::stream::StreamsSnapshot;
+ use crate::stm::user::{PermissionerSnapshot, UserSnapshot,
UsersSnapshot};
+ use iggy_common::UserStatus;
+
+ let ts = IggyTimestamp::from(1694968446131680u64);
+
+ let users_snap = UsersSnapshot {
+ items: vec![
+ (
+ 0,
+ UserSnapshot {
+ id: 0,
+ username: "alice".to_string(),
+ password_hash: "hash_a".to_string(),
+ status: UserStatus::Active,
+ created_at: ts,
+ permissions: None,
+ },
+ ),
+ (
+ 2,
+ UserSnapshot {
+ id: 2,
+ username: "charlie".to_string(),
+ password_hash: "hash_c".to_string(),
+ status: UserStatus::Active,
+ created_at: ts,
+ permissions: None,
+ },
+ ),
+ ],
+ personal_access_tokens: vec![],
+ permissioner: PermissionerSnapshot {
+ users_permissions: vec![],
+ users_streams_permissions: vec![],
+ users_that_can_poll_messages_from_all_streams: vec![],
+ users_that_can_send_messages_to_all_streams: vec![],
+ users_that_can_poll_messages_from_specific_streams: vec![],
+ users_that_can_send_messages_to_specific_streams: vec![],
+ },
+ };
+
+ let streams_snap = StreamsSnapshot {
+ items: vec![
+ (
+ 0,
+ StreamSnapshot {
+ id: 0,
+ name: "stream-0".to_string(),
+ created_at: ts,
+ stats: StatsSnapshot {
+ size_bytes: 100,
+ messages_count: 10,
+ segments_count: 1,
+ },
+ topics: vec![],
+ },
+ ),
+ (
+ 3,
+ StreamSnapshot {
+ id: 3,
+ name: "stream-3".to_string(),
+ created_at: ts,
+ stats: StatsSnapshot {
+ size_bytes: 200,
+ messages_count: 20,
+ segments_count: 2,
+ },
+ topics: vec![],
+ },
+ ),
+ ],
+ };
+
+ let mut snapshot = MetadataSnapshot::new(99);
+ snapshot.users = Some(users_snap);
+ snapshot.streams = Some(streams_snap);
+
+ let encoded = snapshot.encode().unwrap();
+ let decoded = MetadataSnapshot::decode(&encoded).unwrap();
+
+ use crate::stm::user::Users;
+ let restored_users: Users =
RestoreSnapshot::restore_snapshot(&decoded).unwrap();
+
+ let mut verify = MetadataSnapshot::new(0);
+ restored_users.fill_snapshot(&mut verify).unwrap();
+ let users_snap = verify.users.unwrap();
+ assert_eq!(users_snap.items.len(), 2);
+ assert_eq!(users_snap.items[0].0, 0);
+ assert_eq!(users_snap.items[0].1.username, "alice");
+ assert_eq!(users_snap.items[0].1.id, 0);
+ assert_eq!(users_snap.items[1].0, 2);
+ assert_eq!(users_snap.items[1].1.username, "charlie");
+ assert_eq!(users_snap.items[1].1.id, 2);
+
+ use crate::stm::stream::Streams;
+ let restored_streams: Streams =
RestoreSnapshot::restore_snapshot(&decoded).unwrap();
+
+ let mut verify = MetadataSnapshot::new(0);
+ restored_streams.fill_snapshot(&mut verify).unwrap();
+ let streams_snap = verify.streams.unwrap();
+ assert_eq!(streams_snap.items.len(), 2);
+ assert_eq!(streams_snap.items[0].0, 0);
+ assert_eq!(streams_snap.items[0].1.name, "stream-0");
+ assert_eq!(streams_snap.items[1].0, 3);
+ assert_eq!(streams_snap.items[1].1.name, "stream-3");
+ }
+}
diff --git a/core/metadata/src/stm/stream.rs b/core/metadata/src/stm/stream.rs
index 4f212cd20..7e4c95104 100644
--- a/core/metadata/src/stm/stream.rs
+++ b/core/metadata/src/stm/stream.rs
@@ -17,7 +17,8 @@
use crate::stats::{StreamStats, TopicStats};
use crate::stm::StateHandler;
-use crate::{collect_handlers, define_state};
+use crate::stm::snapshot::Snapshotable;
+use crate::{collect_handlers, define_state, impl_fill_restore};
use ahash::AHashMap;
use iggy_common::create_partitions::CreatePartitions;
use iggy_common::create_stream::CreateStream;
@@ -30,9 +31,17 @@ use iggy_common::purge_topic::PurgeTopic;
use iggy_common::update_stream::UpdateStream;
use iggy_common::update_topic::UpdateTopic;
use iggy_common::{CompressionAlgorithm, IggyExpiry, IggyTimestamp,
MaxTopicSize};
+use serde::{Deserialize, Serialize};
use slab::Slab;
use std::sync::Arc;
-use std::sync::atomic::AtomicUsize;
+use std::sync::atomic::{AtomicUsize, Ordering};
+
+/// Partition snapshot representation for serialization.
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct PartitionSnapshot {
+ pub id: usize,
+ pub created_at: IggyTimestamp,
+}
#[derive(Debug, Clone)]
pub struct Partition {
@@ -46,6 +55,29 @@ impl Partition {
}
}
+/// Stats snapshot representation for serialization.
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct StatsSnapshot {
+ pub size_bytes: u64,
+ pub messages_count: u64,
+ pub segments_count: u32,
+}
+
+/// Topic snapshot representation for serialization.
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct TopicSnapshot {
+ pub id: usize,
+ pub name: String,
+ pub created_at: IggyTimestamp,
+ pub replication_factor: u8,
+ pub message_expiry: IggyExpiry,
+ pub compression_algorithm: CompressionAlgorithm,
+ pub max_topic_size: MaxTopicSize,
+ pub stats: StatsSnapshot,
+ pub partitions: Vec<PartitionSnapshot>,
+ pub round_robin_counter: usize,
+}
+
#[derive(Debug, Clone)]
pub struct Topic {
pub id: usize,
@@ -103,6 +135,16 @@ impl Topic {
}
}
+/// Stream snapshot representation for serialization.
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct StreamSnapshot {
+ pub id: usize,
+ pub name: String,
+ pub created_at: IggyTimestamp,
+ pub stats: StatsSnapshot,
+ pub topics: Vec<(usize, TopicSnapshot)>,
+}
+
#[derive(Debug)]
pub struct Stream {
pub id: usize,
@@ -466,3 +508,147 @@ impl StateHandler for DeletePartitions {
}
}
}
+
+/// Snapshot representation for the Streams state machine.
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct StreamsSnapshot {
+ pub items: Vec<(usize, StreamSnapshot)>,
+}
+
+impl Snapshotable for Streams {
+ type Snapshot = StreamsSnapshot;
+
+ fn to_snapshot(&self) -> Self::Snapshot {
+ self.inner.read(|inner| {
+ let items: Vec<(usize, StreamSnapshot)> = inner
+ .items
+ .iter()
+ .map(|(stream_id, stream)| {
+ let (size_bytes, messages_count, segments_count) =
+ stream.stats.load_for_snapshot();
+ let topics: Vec<(usize, TopicSnapshot)> = stream
+ .topics
+ .iter()
+ .map(|(topic_id, topic)| {
+ let (t_size, t_msgs, t_segs) =
topic.stats.load_for_snapshot();
+ (
+ topic_id,
+ TopicSnapshot {
+ id: topic.id,
+ name: topic.name.to_string(),
+ created_at: topic.created_at,
+ replication_factor:
topic.replication_factor,
+ message_expiry: topic.message_expiry,
+ compression_algorithm:
topic.compression_algorithm,
+ max_topic_size: topic.max_topic_size,
+ stats: StatsSnapshot {
+ size_bytes: t_size,
+ messages_count: t_msgs,
+ segments_count: t_segs,
+ },
+ partitions: topic
+ .partitions
+ .iter()
+ .map(|p| PartitionSnapshot {
+ id: p.id,
+ created_at: p.created_at,
+ })
+ .collect(),
+ round_robin_counter: topic
+ .round_robin_counter
+ .load(Ordering::Relaxed),
+ },
+ )
+ })
+ .collect();
+ (
+ stream_id,
+ StreamSnapshot {
+ id: stream.id,
+ name: stream.name.to_string(),
+ created_at: stream.created_at,
+ stats: StatsSnapshot {
+ size_bytes,
+ messages_count,
+ segments_count,
+ },
+ topics,
+ },
+ )
+ })
+ .collect();
+ StreamsSnapshot { items }
+ })
+ }
+
+ fn from_snapshot(
+ snapshot: Self::Snapshot,
+ ) -> Result<Self, crate::stm::snapshot::SnapshotError> {
+ let mut index: AHashMap<Arc<str>, usize> = AHashMap::new();
+ let mut stream_entries: Vec<(usize, Stream)> = Vec::new();
+
+ for (slab_key, stream_snap) in snapshot.items {
+ let stream_stats = Arc::new(StreamStats::default());
+ stream_stats.store_from_snapshot(
+ stream_snap.stats.size_bytes,
+ stream_snap.stats.messages_count,
+ stream_snap.stats.segments_count,
+ );
+
+ let mut topic_index: AHashMap<Arc<str>, usize> = AHashMap::new();
+ let mut topic_entries: Vec<(usize, Topic)> = Vec::new();
+
+ for (topic_slab_key, topic_snap) in stream_snap.topics {
+ let topic_stats =
Arc::new(TopicStats::new(stream_stats.clone()));
+ topic_stats.store_from_snapshot(
+ topic_snap.stats.size_bytes,
+ topic_snap.stats.messages_count,
+ topic_snap.stats.segments_count,
+ );
+ let topic_name: Arc<str> = Arc::from(topic_snap.name.as_str());
+ let topic = Topic {
+ id: topic_snap.id,
+ name: topic_name.clone(),
+ created_at: topic_snap.created_at,
+ replication_factor: topic_snap.replication_factor,
+ message_expiry: topic_snap.message_expiry,
+ compression_algorithm: topic_snap.compression_algorithm,
+ max_topic_size: topic_snap.max_topic_size,
+ stats: topic_stats,
+ partitions: topic_snap
+ .partitions
+ .into_iter()
+ .map(|p| Partition {
+ id: p.id,
+ created_at: p.created_at,
+ })
+ .collect(),
+ round_robin_counter:
Arc::new(AtomicUsize::new(topic_snap.round_robin_counter)),
+ };
+ topic_index.insert(topic_name, topic_slab_key);
+ topic_entries.push((topic_slab_key, topic));
+ }
+
+ let topics: Slab<Topic> = topic_entries.into_iter().collect();
+
+ let stream_name: Arc<str> = Arc::from(stream_snap.name.as_str());
+ let stream = Stream {
+ id: stream_snap.id,
+ name: stream_name.clone(),
+ created_at: stream_snap.created_at,
+ stats: stream_stats,
+ topics,
+ topic_index,
+ };
+
+ index.insert(stream_name, slab_key);
+ stream_entries.push((slab_key, stream));
+ }
+
+ let items: Slab<Stream> = stream_entries.into_iter().collect();
+ let inner = StreamsInner { index, items };
+ Ok(inner.into())
+ }
+}
+
+impl_fill_restore!(Streams, streams);
diff --git a/core/metadata/src/stm/user.rs b/core/metadata/src/stm/user.rs
index 88bc178ea..14f71f3af 100644
--- a/core/metadata/src/stm/user.rs
+++ b/core/metadata/src/stm/user.rs
@@ -17,7 +17,8 @@
use crate::permissioner::Permissioner;
use crate::stm::StateHandler;
-use crate::{collect_handlers, define_state};
+use crate::stm::snapshot::Snapshotable;
+use crate::{collect_handlers, define_state, impl_fill_restore};
use ahash::AHashMap;
use iggy_common::change_password::ChangePassword;
use iggy_common::create_personal_access_token::CreatePersonalAccessToken;
@@ -26,7 +27,11 @@ use
iggy_common::delete_personal_access_token::DeletePersonalAccessToken;
use iggy_common::delete_user::DeleteUser;
use iggy_common::update_permissions::UpdatePermissions;
use iggy_common::update_user::UpdateUser;
-use iggy_common::{IggyTimestamp, Permissions, PersonalAccessToken, UserId,
UserStatus};
+use iggy_common::{
+ GlobalPermissions, IggyTimestamp, Permissions, PersonalAccessToken,
StreamPermissions, UserId,
+ UserStatus,
+};
+use serde::{Deserialize, Serialize};
use slab::Slab;
use std::sync::Arc;
@@ -258,3 +263,219 @@ impl StateHandler for DeletePersonalAccessToken {
}
}
}
+
+/// User snapshot representation for serialization.
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct UserSnapshot {
+ pub id: UserId,
+ pub username: String,
+ pub password_hash: String,
+ pub status: UserStatus,
+ pub created_at: IggyTimestamp,
+ pub permissions: Option<Permissions>,
+}
+
+/// Personal access token snapshot representation for serialization.
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct PersonalAccessTokenSnapshot {
+ pub user_id: UserId,
+ pub name: String,
+ pub token: String,
+ pub expiry_at: Option<IggyTimestamp>,
+}
+
+/// Permissioner snapshot representation for serialization.
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct PermissionerSnapshot {
+ pub users_permissions: Vec<(UserId, GlobalPermissions)>,
+ pub users_streams_permissions: Vec<((UserId, usize), StreamPermissions)>,
+ pub users_that_can_poll_messages_from_all_streams: Vec<UserId>,
+ pub users_that_can_send_messages_to_all_streams: Vec<UserId>,
+ pub users_that_can_poll_messages_from_specific_streams: Vec<(UserId,
usize)>,
+ pub users_that_can_send_messages_to_specific_streams: Vec<(UserId, usize)>,
+}
+
+/// Snapshot representation for the Users state machine.
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct UsersSnapshot {
+ pub items: Vec<(usize, UserSnapshot)>,
+ pub personal_access_tokens: Vec<(UserId, Vec<(String,
PersonalAccessTokenSnapshot)>)>,
+ pub permissioner: PermissionerSnapshot,
+}
+
+impl Snapshotable for Users {
+ type Snapshot = UsersSnapshot;
+
+ fn to_snapshot(&self) -> Self::Snapshot {
+ self.inner.read(|inner| {
+ let items: Vec<(usize, UserSnapshot)> = inner
+ .items
+ .iter()
+ .map(|(user_id, user)| {
+ (
+ user_id,
+ UserSnapshot {
+ id: user.id,
+ username: user.username.to_string(),
+ password_hash: user.password_hash.to_string(),
+ status: user.status,
+ created_at: user.created_at,
+ permissions: user.permissions.as_ref().map(|p|
(**p).clone()),
+ },
+ )
+ })
+ .collect();
+
+ let personal_access_tokens: Vec<(UserId, Vec<(String,
PersonalAccessTokenSnapshot)>)> =
+ inner
+ .personal_access_tokens
+ .iter()
+ .map(|(&user_id, tokens)| {
+ let token_list: Vec<(String,
PersonalAccessTokenSnapshot)> = tokens
+ .iter()
+ .map(|(name, pat)| {
+ (
+ name.to_string(),
+ PersonalAccessTokenSnapshot {
+ user_id: pat.user_id,
+ name: pat.name.to_string(),
+ token: pat.token.to_string(),
+ expiry_at: pat.expiry_at,
+ },
+ )
+ })
+ .collect();
+ (user_id, token_list)
+ })
+ .collect();
+
+ let permissioner = PermissionerSnapshot {
+ users_permissions: inner
+ .permissioner
+ .users_permissions
+ .iter()
+ .map(|(&k, v)| (k, v.clone()))
+ .collect(),
+ users_streams_permissions: inner
+ .permissioner
+ .users_streams_permissions
+ .iter()
+ .map(|(&k, v)| (k, v.clone()))
+ .collect(),
+ users_that_can_poll_messages_from_all_streams: inner
+ .permissioner
+ .users_that_can_poll_messages_from_all_streams
+ .iter()
+ .copied()
+ .collect(),
+ users_that_can_send_messages_to_all_streams: inner
+ .permissioner
+ .users_that_can_send_messages_to_all_streams
+ .iter()
+ .copied()
+ .collect(),
+ users_that_can_poll_messages_from_specific_streams: inner
+ .permissioner
+ .users_that_can_poll_messages_from_specific_streams
+ .iter()
+ .copied()
+ .collect(),
+ users_that_can_send_messages_to_specific_streams: inner
+ .permissioner
+ .users_that_can_send_messages_to_specific_streams
+ .iter()
+ .copied()
+ .collect(),
+ };
+
+ UsersSnapshot {
+ items,
+ personal_access_tokens,
+ permissioner,
+ }
+ })
+ }
+
+ fn from_snapshot(
+ snapshot: Self::Snapshot,
+ ) -> Result<Self, crate::stm::snapshot::SnapshotError> {
+ let mut index: AHashMap<Arc<str>, UserId> = AHashMap::new();
+ let mut user_entries: Vec<(usize, User)> = Vec::new();
+
+ for (slab_key, user_snap) in snapshot.items {
+ let username: Arc<str> = Arc::from(user_snap.username.as_str());
+ let user = User {
+ id: user_snap.id,
+ username: username.clone(),
+ password_hash: Arc::from(user_snap.password_hash.as_str()),
+ status: user_snap.status,
+ created_at: user_snap.created_at,
+ permissions: user_snap.permissions.map(Arc::new),
+ };
+
+ index.insert(username, slab_key as UserId);
+ user_entries.push((slab_key, user));
+ }
+
+ let items: Slab<User> = user_entries.into_iter().collect();
+
+ let mut personal_access_tokens: AHashMap<UserId, AHashMap<Arc<str>,
PersonalAccessToken>> =
+ AHashMap::new();
+ for (user_id, tokens) in snapshot.personal_access_tokens {
+ let mut token_map: AHashMap<Arc<str>, PersonalAccessToken> =
AHashMap::new();
+ for (name, pat_snap) in tokens {
+ let pat = PersonalAccessToken::raw(
+ pat_snap.user_id,
+ &pat_snap.name,
+ &pat_snap.token,
+ pat_snap.expiry_at,
+ );
+ token_map.insert(Arc::from(name.as_str()), pat);
+ }
+ personal_access_tokens.insert(user_id, token_map);
+ }
+
+ let permissioner = Permissioner {
+ users_permissions: snapshot
+ .permissioner
+ .users_permissions
+ .into_iter()
+ .collect(),
+ users_streams_permissions: snapshot
+ .permissioner
+ .users_streams_permissions
+ .into_iter()
+ .collect(),
+ users_that_can_poll_messages_from_all_streams: snapshot
+ .permissioner
+ .users_that_can_poll_messages_from_all_streams
+ .into_iter()
+ .collect(),
+ users_that_can_send_messages_to_all_streams: snapshot
+ .permissioner
+ .users_that_can_send_messages_to_all_streams
+ .into_iter()
+ .collect(),
+ users_that_can_poll_messages_from_specific_streams: snapshot
+ .permissioner
+ .users_that_can_poll_messages_from_specific_streams
+ .into_iter()
+ .collect(),
+ users_that_can_send_messages_to_specific_streams: snapshot
+ .permissioner
+ .users_that_can_send_messages_to_specific_streams
+ .into_iter()
+ .collect(),
+ };
+
+ let inner = UsersInner {
+ index,
+ items,
+ personal_access_tokens,
+ permissioner,
+ };
+ Ok(inner.into())
+ }
+}
+
+impl_fill_restore!(Users, users);