Add RRD (Round-Robin Database) file persistence system: - RrdWriter: Main API for RRD operations - Schema definitions for CPU, memory, network metrics - Format migration support (v1/v2/v3) - rrdcached integration for batched writes - Data transformation for legacy formats
This is an independent crate with no internal dependencies, only requiring external RRD libraries (rrd, rrdcached-client) and tokio for async operations. It handles time-series data storage compatible with the C implementation. Includes comprehensive unit tests for data transformation, schema generation, and multi-source data processing. Signed-off-by: Kefu Chai <[email protected]> --- src/pmxcfs-rs/Cargo.toml | 1 + src/pmxcfs-rs/pmxcfs-rrd/Cargo.toml | 18 + src/pmxcfs-rs/pmxcfs-rrd/README.md | 51 ++ src/pmxcfs-rs/pmxcfs-rrd/src/backend.rs | 67 ++ .../pmxcfs-rrd/src/backend/backend_daemon.rs | 214 +++++++ .../pmxcfs-rrd/src/backend/backend_direct.rs | 606 ++++++++++++++++++ .../src/backend/backend_fallback.rs | 229 +++++++ src/pmxcfs-rs/pmxcfs-rrd/src/daemon.rs | 140 ++++ src/pmxcfs-rs/pmxcfs-rrd/src/key_type.rs | 313 +++++++++ src/pmxcfs-rs/pmxcfs-rrd/src/lib.rs | 21 + src/pmxcfs-rs/pmxcfs-rrd/src/schema.rs | 577 +++++++++++++++++ src/pmxcfs-rs/pmxcfs-rrd/src/writer.rs | 397 ++++++++++++ 12 files changed, 2634 insertions(+) create mode 100644 src/pmxcfs-rs/pmxcfs-rrd/Cargo.toml create mode 100644 src/pmxcfs-rs/pmxcfs-rrd/README.md create mode 100644 src/pmxcfs-rs/pmxcfs-rrd/src/backend.rs create mode 100644 src/pmxcfs-rs/pmxcfs-rrd/src/backend/backend_daemon.rs create mode 100644 src/pmxcfs-rs/pmxcfs-rrd/src/backend/backend_direct.rs create mode 100644 src/pmxcfs-rs/pmxcfs-rrd/src/backend/backend_fallback.rs create mode 100644 src/pmxcfs-rs/pmxcfs-rrd/src/daemon.rs create mode 100644 src/pmxcfs-rs/pmxcfs-rrd/src/key_type.rs create mode 100644 src/pmxcfs-rs/pmxcfs-rrd/src/lib.rs create mode 100644 src/pmxcfs-rs/pmxcfs-rrd/src/schema.rs create mode 100644 src/pmxcfs-rs/pmxcfs-rrd/src/writer.rs diff --git a/src/pmxcfs-rs/Cargo.toml b/src/pmxcfs-rs/Cargo.toml index 4d17e87e..dd36c81f 100644 --- a/src/pmxcfs-rs/Cargo.toml +++ b/src/pmxcfs-rs/Cargo.toml @@ -4,6 +4,7 @@ members = [ "pmxcfs-api-types", # Shared types and error definitions "pmxcfs-config", # Configuration management "pmxcfs-logger", # Cluster log with ring buffer and deduplication + "pmxcfs-rrd", # RRD (Round-Robin Database) persistence ] resolver = "2" diff --git a/src/pmxcfs-rs/pmxcfs-rrd/Cargo.toml b/src/pmxcfs-rs/pmxcfs-rrd/Cargo.toml new file mode 100644 index 00000000..bab71423 --- /dev/null +++ b/src/pmxcfs-rs/pmxcfs-rrd/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "pmxcfs-rrd" +version.workspace = true +edition.workspace = true +authors.workspace = true +license.workspace = true + +[dependencies] +anyhow.workspace = true +async-trait = "0.1" +chrono = { version = "0.4", default-features = false, features = ["clock"] } +rrd = "0.2" +rrdcached-client = "0.1.5" +tokio.workspace = true +tracing.workspace = true + +[dev-dependencies] +tempfile.workspace = true diff --git a/src/pmxcfs-rs/pmxcfs-rrd/README.md b/src/pmxcfs-rs/pmxcfs-rrd/README.md new file mode 100644 index 00000000..800d78cf --- /dev/null +++ b/src/pmxcfs-rs/pmxcfs-rrd/README.md @@ -0,0 +1,51 @@ +# pmxcfs-rrd + +RRD (Round-Robin Database) persistence for pmxcfs performance metrics. + +## Overview + +This crate provides RRD file management for storing time-series performance data from Proxmox nodes and VMs. It handles file creation, updates, and integration with rrdcached daemon for efficient writes. + +### Key Features + +- RRD file creation with schema-based initialization +- RRD updates (write metrics to disk) +- rrdcached integration for batched writes +- Support for both legacy and current schema versions +- Type-safe key parsing and validation +- Compatible with existing C-created RRD files + +## Module Structure + +| Module | Purpose | +|--------|---------| +| `writer.rs` | Main RrdWriter API | +| `schema.rs` | RRD schema definitions (DS, RRA) | +| `key_type.rs` | RRD key parsing and validation | +| `daemon.rs` | rrdcached daemon client | + +## External Dependencies + +- **librrd**: RRDtool library (via FFI bindings) +- **rrdcached**: Optional daemon for batched writes and improved performance + +## Testing + +Unit tests verify: +- Schema generation and validation +- Key parsing for different RRD types (node, VM, storage) +- RRD file creation and update operations +- rrdcached client connection and fallback behavior + +Run tests with: +```bash +cargo test -p pmxcfs-rrd +``` + +## References + +- **C Implementation**: `src/pmxcfs/status.c` (RRD code embedded) +- **Related Crates**: + - `pmxcfs-status` - Uses RrdWriter for metrics persistence + - `pmxcfs` - FUSE `.rrd` plugin reads RRD files +- **RRDtool Documentation**: https://oss.oetiker.ch/rrdtool/ diff --git a/src/pmxcfs-rs/pmxcfs-rrd/src/backend.rs b/src/pmxcfs-rs/pmxcfs-rrd/src/backend.rs new file mode 100644 index 00000000..58652831 --- /dev/null +++ b/src/pmxcfs-rs/pmxcfs-rrd/src/backend.rs @@ -0,0 +1,67 @@ +/// RRD Backend Trait and Implementations +/// +/// This module provides an abstraction over different RRD writing mechanisms: +/// - Daemon-based (via rrdcached) for performance and batching +/// - Direct file writing for reliability and fallback scenarios +/// - Fallback composite that tries daemon first, then falls back to direct +/// +/// This design matches the C implementation's behavior in status.c where +/// it attempts daemon update first, then falls back to direct file writes. +use super::schema::RrdSchema; +use anyhow::Result; +use async_trait::async_trait; +use std::path::Path; + +/// Trait for RRD backend implementations +/// +/// Provides abstraction over different RRD writing mechanisms. +/// All methods are async to support both async (daemon) and sync (direct file) operations. +#[async_trait] +pub trait RrdBackend: Send + Sync { + /// Update RRD file with new data + /// + /// # Arguments + /// * `file_path` - Full path to the RRD file + /// * `data` - Update data in format "timestamp:value1:value2:..." + async fn update(&mut self, file_path: &Path, data: &str) -> Result<()>; + + /// Create new RRD file with schema + /// + /// # Arguments + /// * `file_path` - Full path where RRD file should be created + /// * `schema` - RRD schema defining data sources and archives + /// * `start_timestamp` - Start time for the RRD file (Unix timestamp) + async fn create( + &mut self, + file_path: &Path, + schema: &RrdSchema, + start_timestamp: i64, + ) -> Result<()>; + + /// Flush pending updates to disk + /// + /// For daemon backends, this sends a FLUSH command. + /// For direct backends, this is a no-op (writes are immediate). + #[allow(dead_code)] // Used in backend implementations via trait dispatch + async fn flush(&mut self) -> Result<()>; + + /// Check if backend is available and healthy + /// + /// Returns true if the backend can be used for operations. + /// For daemon backends, this checks if the connection is alive. + /// For direct backends, this always returns true. + #[allow(dead_code)] // Used in fallback backend via trait dispatch + async fn is_available(&self) -> bool; + + /// Get a human-readable name for this backend + fn name(&self) -> &str; +} + +// Backend implementations +mod backend_daemon; +mod backend_direct; +mod backend_fallback; + +pub use backend_daemon::RrdCachedBackend; +pub use backend_direct::RrdDirectBackend; +pub use backend_fallback::RrdFallbackBackend; diff --git a/src/pmxcfs-rs/pmxcfs-rrd/src/backend/backend_daemon.rs b/src/pmxcfs-rs/pmxcfs-rrd/src/backend/backend_daemon.rs new file mode 100644 index 00000000..28c1a99a --- /dev/null +++ b/src/pmxcfs-rs/pmxcfs-rrd/src/backend/backend_daemon.rs @@ -0,0 +1,214 @@ +/// RRD Backend: rrdcached daemon +/// +/// Uses rrdcached for batched, high-performance RRD updates. +/// This is the preferred backend when the daemon is available. +use super::super::schema::RrdSchema; +use anyhow::{Context, Result}; +use async_trait::async_trait; +use rrdcached_client::RRDCachedClient; +use rrdcached_client::consolidation_function::ConsolidationFunction; +use rrdcached_client::create::{ + CreateArguments, CreateDataSource, CreateDataSourceType, CreateRoundRobinArchive, +}; +use std::path::Path; + +/// RRD backend using rrdcached daemon +pub struct RrdCachedBackend { + client: RRDCachedClient<tokio::net::UnixStream>, +} + +impl RrdCachedBackend { + /// Connect to rrdcached daemon + /// + /// # Arguments + /// * `socket_path` - Path to rrdcached Unix socket (default: /var/run/rrdcached.sock) + pub async fn connect(socket_path: &str) -> Result<Self> { + let client = RRDCachedClient::connect_unix(socket_path) + .await + .with_context(|| format!("Failed to connect to rrdcached at {socket_path}"))?; + + tracing::info!("Connected to rrdcached at {}", socket_path); + + Ok(Self { client }) + } +} + +#[async_trait] +impl super::super::backend::RrdBackend for RrdCachedBackend { + async fn update(&mut self, file_path: &Path, data: &str) -> Result<()> { + // Parse the update data + let parts: Vec<&str> = data.split(':').collect(); + if parts.len() < 2 { + anyhow::bail!("Invalid update data format: {data}"); + } + + let timestamp = if parts[0] == "N" { + None + } else { + Some( + parts[0] + .parse::<usize>() + .with_context(|| format!("Invalid timestamp: {}", parts[0]))?, + ) + }; + + let values: Vec<f64> = parts[1..] + .iter() + .map(|v| { + if *v == "U" { + Ok(f64::NAN) + } else { + v.parse::<f64>() + .with_context(|| format!("Invalid value: {v}")) + } + }) + .collect::<Result<Vec<_>>>()?; + + // Get file path without .rrd extension (rrdcached-client adds it) + let path_str = file_path.to_string_lossy(); + let path_without_ext = path_str.strip_suffix(".rrd").unwrap_or(&path_str); + + // Send update via rrdcached + self.client + .update(path_without_ext, timestamp, values) + .await + .with_context(|| format!("rrdcached update failed for {:?}", file_path))?; + + tracing::trace!("Updated RRD via daemon: {:?} -> {}", file_path, data); + + Ok(()) + } + + async fn create( + &mut self, + file_path: &Path, + schema: &RrdSchema, + start_timestamp: i64, + ) -> Result<()> { + tracing::debug!( + "Creating RRD file via daemon: {:?} with {} data sources", + file_path, + schema.column_count() + ); + + // Convert our data sources to rrdcached-client CreateDataSource objects + let mut data_sources = Vec::new(); + for ds in &schema.data_sources { + let serie_type = match ds.ds_type { + "GAUGE" => CreateDataSourceType::Gauge, + "DERIVE" => CreateDataSourceType::Derive, + "COUNTER" => CreateDataSourceType::Counter, + "ABSOLUTE" => CreateDataSourceType::Absolute, + _ => anyhow::bail!("Unsupported data source type: {}", ds.ds_type), + }; + + // Parse min/max values + let minimum = if ds.min == "U" { + None + } else { + ds.min.parse().ok() + }; + let maximum = if ds.max == "U" { + None + } else { + ds.max.parse().ok() + }; + + let data_source = CreateDataSource { + name: ds.name.to_string(), + minimum, + maximum, + heartbeat: ds.heartbeat as i64, + serie_type, + }; + + data_sources.push(data_source); + } + + // Convert our RRA definitions to rrdcached-client CreateRoundRobinArchive objects + let mut archives = Vec::new(); + for rra in &schema.archives { + // Parse RRA string: "RRA:AVERAGE:0.5:1:70" + let parts: Vec<&str> = rra.split(':').collect(); + if parts.len() != 5 || parts[0] != "RRA" { + anyhow::bail!("Invalid RRA format: {rra}"); + } + + let consolidation_function = match parts[1] { + "AVERAGE" => ConsolidationFunction::Average, + "MIN" => ConsolidationFunction::Min, + "MAX" => ConsolidationFunction::Max, + "LAST" => ConsolidationFunction::Last, + _ => anyhow::bail!("Unsupported consolidation function: {}", parts[1]), + }; + + let xfiles_factor: f64 = parts[2] + .parse() + .with_context(|| format!("Invalid xff in RRA: {rra}"))?; + let steps: i64 = parts[3] + .parse() + .with_context(|| format!("Invalid steps in RRA: {rra}"))?; + let rows: i64 = parts[4] + .parse() + .with_context(|| format!("Invalid rows in RRA: {rra}"))?; + + let archive = CreateRoundRobinArchive { + consolidation_function, + xfiles_factor, + steps, + rows, + }; + archives.push(archive); + } + + // Get path without .rrd extension (rrdcached-client adds it) + let path_str = file_path.to_string_lossy(); + let path_without_ext = path_str + .strip_suffix(".rrd") + .unwrap_or(&path_str) + .to_string(); + + // Create CreateArguments + let create_args = CreateArguments { + path: path_without_ext, + data_sources, + round_robin_archives: archives, + start_timestamp: start_timestamp as u64, + step_seconds: 60, // 60-second step (1 minute resolution) + }; + + // Validate before sending + create_args.validate().context("Invalid CREATE arguments")?; + + // Send CREATE command via rrdcached + self.client + .create(create_args) + .await + .with_context(|| format!("Failed to create RRD file via daemon: {file_path:?}"))?; + + tracing::info!("Created RRD file via daemon: {:?} ({})", file_path, schema); + + Ok(()) + } + + async fn flush(&mut self) -> Result<()> { + self.client + .flush_all() + .await + .context("Failed to flush rrdcached")?; + + tracing::debug!("Flushed all pending RRD updates"); + + Ok(()) + } + + async fn is_available(&self) -> bool { + // For now, assume we're available if we have a client + // Could add a PING command in the future + true + } + + fn name(&self) -> &str { + "rrdcached" + } +} diff --git a/src/pmxcfs-rs/pmxcfs-rrd/src/backend/backend_direct.rs b/src/pmxcfs-rs/pmxcfs-rrd/src/backend/backend_direct.rs new file mode 100644 index 00000000..6be3eb5d --- /dev/null +++ b/src/pmxcfs-rs/pmxcfs-rrd/src/backend/backend_direct.rs @@ -0,0 +1,606 @@ +/// RRD Backend: Direct file writing +/// +/// Uses the `rrd` crate (librrd bindings) for direct RRD file operations. +/// This backend is used as a fallback when rrdcached is unavailable. +/// +/// This matches the C implementation's behavior in status.c:1416-1420 where +/// it falls back to rrd_update_r() and rrd_create_r() for direct file access. +use super::super::schema::RrdSchema; +use anyhow::{Context, Result}; +use async_trait::async_trait; +use std::path::Path; +use std::time::Duration; + +/// RRD backend using direct file operations via librrd +pub struct RrdDirectBackend { + // Currently stateless, but kept as struct for future enhancements +} + +impl RrdDirectBackend { + /// Create a new direct file backend + pub fn new() -> Self { + tracing::info!("Using direct RRD file backend (via librrd)"); + Self {} + } +} + +impl Default for RrdDirectBackend { + fn default() -> Self { + Self::new() + } +} + +#[async_trait] +impl super::super::backend::RrdBackend for RrdDirectBackend { + async fn update(&mut self, file_path: &Path, data: &str) -> Result<()> { + let path = file_path.to_path_buf(); + let data_str = data.to_string(); + + // Use tokio::task::spawn_blocking for sync rrd operations + // This prevents blocking the async runtime + tokio::task::spawn_blocking(move || { + // Parse the update data to extract timestamp and values + // Format: "timestamp:value1:value2:..." + let parts: Vec<&str> = data_str.split(':').collect(); + if parts.is_empty() { + anyhow::bail!("Empty update data"); + } + + // Use rrd::ops::update::update_all_with_timestamp + // This is the most direct way to update RRD files + let timestamp_str = parts[0]; + let timestamp: i64 = if timestamp_str == "N" { + // "N" means "now" in RRD terminology + chrono::Utc::now().timestamp() + } else { + timestamp_str + .parse() + .with_context(|| format!("Invalid timestamp: {}", timestamp_str))? + }; + + let timestamp = chrono::DateTime::from_timestamp(timestamp, 0) + .ok_or_else(|| anyhow::anyhow!("Invalid timestamp value: {}", timestamp))?; + + // Convert values to Datum + let values: Vec<rrd::ops::update::Datum> = parts[1..] + .iter() + .map(|v| { + if *v == "U" { + // Unknown/unspecified value + rrd::ops::update::Datum::Unspecified + } else if let Ok(int_val) = v.parse::<u64>() { + rrd::ops::update::Datum::Int(int_val) + } else if let Ok(float_val) = v.parse::<f64>() { + rrd::ops::update::Datum::Float(float_val) + } else { + rrd::ops::update::Datum::Unspecified + } + }) + .collect(); + + // Perform the update + rrd::ops::update::update_all( + &path, + rrd::ops::update::ExtraFlags::empty(), + &[( + rrd::ops::update::BatchTime::Timestamp(timestamp), + values.as_slice(), + )], + ) + .with_context(|| format!("Direct RRD update failed for {:?}", path))?; + + tracing::trace!("Updated RRD via direct file: {:?} -> {}", path, data_str); + + Ok::<(), anyhow::Error>(()) + }) + .await + .context("Failed to spawn blocking task for RRD update")??; + + Ok(()) + } + + async fn create( + &mut self, + file_path: &Path, + schema: &RrdSchema, + start_timestamp: i64, + ) -> Result<()> { + tracing::debug!( + "Creating RRD file via direct: {:?} with {} data sources", + file_path, + schema.column_count() + ); + + let path = file_path.to_path_buf(); + let schema = schema.clone(); + + // Ensure parent directory exists + if let Some(parent) = path.parent() { + std::fs::create_dir_all(parent) + .with_context(|| format!("Failed to create directory: {parent:?}"))?; + } + + // Use tokio::task::spawn_blocking for sync rrd operations + tokio::task::spawn_blocking(move || { + // Convert timestamp + let start = chrono::DateTime::from_timestamp(start_timestamp, 0) + .ok_or_else(|| anyhow::anyhow!("Invalid start timestamp: {}", start_timestamp))?; + + // Convert data sources + let data_sources: Vec<rrd::ops::create::DataSource> = schema + .data_sources + .iter() + .map(|ds| { + let name = rrd::ops::create::DataSourceName::new(ds.name); + + match ds.ds_type { + "GAUGE" => { + let min = if ds.min == "U" { + None + } else { + Some(ds.min.parse().context("Invalid min value")?) + }; + let max = if ds.max == "U" { + None + } else { + Some(ds.max.parse().context("Invalid max value")?) + }; + Ok(rrd::ops::create::DataSource::gauge( + name, + ds.heartbeat, + min, + max, + )) + } + "DERIVE" => { + let min = if ds.min == "U" { + None + } else { + Some(ds.min.parse().context("Invalid min value")?) + }; + let max = if ds.max == "U" { + None + } else { + Some(ds.max.parse().context("Invalid max value")?) + }; + Ok(rrd::ops::create::DataSource::derive( + name, + ds.heartbeat, + min, + max, + )) + } + "COUNTER" => { + let min = if ds.min == "U" { + None + } else { + Some(ds.min.parse().context("Invalid min value")?) + }; + let max = if ds.max == "U" { + None + } else { + Some(ds.max.parse().context("Invalid max value")?) + }; + Ok(rrd::ops::create::DataSource::counter( + name, + ds.heartbeat, + min, + max, + )) + } + "ABSOLUTE" => { + let min = if ds.min == "U" { + None + } else { + Some(ds.min.parse().context("Invalid min value")?) + }; + let max = if ds.max == "U" { + None + } else { + Some(ds.max.parse().context("Invalid max value")?) + }; + Ok(rrd::ops::create::DataSource::absolute( + name, + ds.heartbeat, + min, + max, + )) + } + _ => anyhow::bail!("Unsupported data source type: {}", ds.ds_type), + } + }) + .collect::<Result<Vec<_>>>()?; + + // Convert RRAs + let archives: Result<Vec<rrd::ops::create::Archive>> = schema + .archives + .iter() + .map(|rra| { + // Parse RRA string: "RRA:AVERAGE:0.5:1:1440" + let parts: Vec<&str> = rra.split(':').collect(); + if parts.len() != 5 || parts[0] != "RRA" { + anyhow::bail!("Invalid RRA format: {}", rra); + } + + let cf = match parts[1] { + "AVERAGE" => rrd::ConsolidationFn::Avg, + "MIN" => rrd::ConsolidationFn::Min, + "MAX" => rrd::ConsolidationFn::Max, + "LAST" => rrd::ConsolidationFn::Last, + _ => anyhow::bail!("Unsupported consolidation function: {}", parts[1]), + }; + + let xff: f64 = parts[2] + .parse() + .with_context(|| format!("Invalid xff in RRA: {}", rra))?; + let steps: u32 = parts[3] + .parse() + .with_context(|| format!("Invalid steps in RRA: {}", rra))?; + let rows: u32 = parts[4] + .parse() + .with_context(|| format!("Invalid rows in RRA: {}", rra))?; + + rrd::ops::create::Archive::new(cf, xff, steps, rows) + .map_err(|e| anyhow::anyhow!("Failed to create archive: {}", e)) + }) + .collect(); + + let archives = archives?; + + // Call rrd::ops::create::create + rrd::ops::create::create( + &path, + start, + Duration::from_secs(60), // 60-second step + false, // no_overwrite = false + None, // template + &[], // sources + data_sources.iter(), + archives.iter(), + ) + .with_context(|| format!("Direct RRD create failed for {:?}", path))?; + + tracing::info!("Created RRD file via direct: {:?} ({})", path, schema); + + Ok::<(), anyhow::Error>(()) + }) + .await + .context("Failed to spawn blocking task for RRD create")??; + + Ok(()) + } + + async fn flush(&mut self) -> Result<()> { + // No-op for direct backend - writes are immediate + tracing::trace!("Flush called on direct backend (no-op)"); + Ok(()) + } + + async fn is_available(&self) -> bool { + // Direct backend is always available (no external dependencies) + true + } + + fn name(&self) -> &str { + "direct" + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::backend::RrdBackend; + use crate::schema::{RrdFormat, RrdSchema}; + use std::path::PathBuf; + use tempfile::TempDir; + + // ===== Test Helpers ===== + + /// Create a temporary directory for RRD files + fn setup_temp_dir() -> TempDir { + TempDir::new().expect("Failed to create temp directory") + } + + /// Create a test RRD file path + fn test_rrd_path(dir: &TempDir, name: &str) -> PathBuf { + dir.path().join(format!("{}.rrd", name)) + } + + // ===== RrdDirectBackend Tests ===== + + #[tokio::test] + async fn test_direct_backend_create_node_rrd() { + let temp_dir = setup_temp_dir(); + let rrd_path = test_rrd_path(&temp_dir, "node_test"); + + let mut backend = RrdDirectBackend::new(); + let schema = RrdSchema::node(RrdFormat::Pve9_0); + let start_time = 1704067200; // 2024-01-01 00:00:00 + + // Create RRD file + let result = backend.create(&rrd_path, &schema, start_time).await; + assert!( + result.is_ok(), + "Failed to create node RRD: {:?}", + result.err() + ); + + // Verify file was created + assert!(rrd_path.exists(), "RRD file should exist after create"); + + // Verify backend name + assert_eq!(backend.name(), "direct"); + } + + #[tokio::test] + async fn test_direct_backend_create_vm_rrd() { + let temp_dir = setup_temp_dir(); + let rrd_path = test_rrd_path(&temp_dir, "vm_test"); + + let mut backend = RrdDirectBackend::new(); + let schema = RrdSchema::vm(RrdFormat::Pve9_0); + let start_time = 1704067200; + + let result = backend.create(&rrd_path, &schema, start_time).await; + assert!( + result.is_ok(), + "Failed to create VM RRD: {:?}", + result.err() + ); + assert!(rrd_path.exists()); + } + + #[tokio::test] + async fn test_direct_backend_create_storage_rrd() { + let temp_dir = setup_temp_dir(); + let rrd_path = test_rrd_path(&temp_dir, "storage_test"); + + let mut backend = RrdDirectBackend::new(); + let schema = RrdSchema::storage(RrdFormat::Pve2); + let start_time = 1704067200; + + let result = backend.create(&rrd_path, &schema, start_time).await; + assert!( + result.is_ok(), + "Failed to create storage RRD: {:?}", + result.err() + ); + assert!(rrd_path.exists()); + } + + #[tokio::test] + async fn test_direct_backend_update_with_timestamp() { + let temp_dir = setup_temp_dir(); + let rrd_path = test_rrd_path(&temp_dir, "update_test"); + + let mut backend = RrdDirectBackend::new(); + let schema = RrdSchema::storage(RrdFormat::Pve2); + let start_time = 1704067200; + + // Create RRD file + backend + .create(&rrd_path, &schema, start_time) + .await + .expect("Failed to create RRD"); + + // Update with explicit timestamp and values + // Format: "timestamp:value1:value2" + let update_data = "1704067260:1000000:500000"; // total=1MB, used=500KB + let result = backend.update(&rrd_path, update_data).await; + + assert!(result.is_ok(), "Failed to update RRD: {:?}", result.err()); + } + + #[tokio::test] + async fn test_direct_backend_update_with_n_timestamp() { + let temp_dir = setup_temp_dir(); + let rrd_path = test_rrd_path(&temp_dir, "update_n_test"); + + let mut backend = RrdDirectBackend::new(); + let schema = RrdSchema::storage(RrdFormat::Pve2); + let start_time = 1704067200; + + backend + .create(&rrd_path, &schema, start_time) + .await + .expect("Failed to create RRD"); + + // Update with "N" (current time) timestamp + let update_data = "N:2000000:750000"; + let result = backend.update(&rrd_path, update_data).await; + + assert!( + result.is_ok(), + "Failed to update RRD with N timestamp: {:?}", + result.err() + ); + } + + #[tokio::test] + async fn test_direct_backend_update_with_unknown_values() { + let temp_dir = setup_temp_dir(); + let rrd_path = test_rrd_path(&temp_dir, "update_u_test"); + + let mut backend = RrdDirectBackend::new(); + let schema = RrdSchema::storage(RrdFormat::Pve2); + let start_time = 1704067200; + + backend + .create(&rrd_path, &schema, start_time) + .await + .expect("Failed to create RRD"); + + // Update with "U" (unknown) values + let update_data = "N:U:1000000"; // total unknown, used known + let result = backend.update(&rrd_path, update_data).await; + + assert!( + result.is_ok(), + "Failed to update RRD with U values: {:?}", + result.err() + ); + } + + #[tokio::test] + async fn test_direct_backend_update_invalid_data() { + let temp_dir = setup_temp_dir(); + let rrd_path = test_rrd_path(&temp_dir, "invalid_test"); + + let mut backend = RrdDirectBackend::new(); + let schema = RrdSchema::storage(RrdFormat::Pve2); + let start_time = 1704067200; + + backend + .create(&rrd_path, &schema, start_time) + .await + .expect("Failed to create RRD"); + + // Test truly invalid data formats that MUST fail + // Note: Invalid values like "abc" are converted to Unspecified (U), which is valid RRD behavior + let invalid_cases = vec![ + "", // Empty string + ":", // Only separator + "timestamp", // Missing values + "N", // No colon separator + "abc:123:456", // Invalid timestamp (not N or integer) + ]; + + for invalid_data in invalid_cases { + let result = backend.update(&rrd_path, invalid_data).await; + assert!( + result.is_err(), + "Update should fail for invalid data: '{}', but got Ok", + invalid_data + ); + } + + // Test lenient data formats that succeed (invalid values become Unspecified) + // Use explicit timestamps to avoid "same timestamp" errors + let mut timestamp = start_time + 60; + let lenient_cases = vec![ + "abc:456", // Invalid first value -> becomes U + "123:def", // Invalid second value -> becomes U + "U:U", // All unknown + ]; + + for valid_data in lenient_cases { + let update_data = format!("{}:{}", timestamp, valid_data); + let result = backend.update(&rrd_path, &update_data).await; + assert!( + result.is_ok(), + "Update should succeed for lenient data: '{}', but got Err: {:?}", + update_data, + result.err() + ); + timestamp += 60; // Increment timestamp for next update + } + } + + #[tokio::test] + async fn test_direct_backend_update_nonexistent_file() { + let temp_dir = setup_temp_dir(); + let rrd_path = test_rrd_path(&temp_dir, "nonexistent"); + + let mut backend = RrdDirectBackend::new(); + + // Try to update a file that doesn't exist + let result = backend.update(&rrd_path, "N:100:200").await; + + assert!(result.is_err(), "Update should fail for nonexistent file"); + } + + #[tokio::test] + async fn test_direct_backend_flush() { + let mut backend = RrdDirectBackend::new(); + + // Flush should always succeed for direct backend (no-op) + let result = backend.flush().await; + assert!( + result.is_ok(), + "Flush should always succeed for direct backend" + ); + } + + #[tokio::test] + async fn test_direct_backend_is_available() { + let backend = RrdDirectBackend::new(); + + // Direct backend should always be available + assert!( + backend.is_available().await, + "Direct backend should always be available" + ); + } + + #[tokio::test] + async fn test_direct_backend_multiple_updates() { + let temp_dir = setup_temp_dir(); + let rrd_path = test_rrd_path(&temp_dir, "multi_update_test"); + + let mut backend = RrdDirectBackend::new(); + let schema = RrdSchema::storage(RrdFormat::Pve2); + let start_time = 1704067200; + + backend + .create(&rrd_path, &schema, start_time) + .await + .expect("Failed to create RRD"); + + // Perform multiple updates + for i in 0..10 { + let timestamp = start_time + 60 * (i + 1); // 1 minute intervals + let total = 1000000 + (i * 100000); + let used = 500000 + (i * 50000); + let update_data = format!("{}:{}:{}", timestamp, total, used); + + let result = backend.update(&rrd_path, &update_data).await; + assert!(result.is_ok(), "Update {} failed: {:?}", i, result.err()); + } + } + + #[tokio::test] + async fn test_direct_backend_overwrite_file() { + let temp_dir = setup_temp_dir(); + let rrd_path = test_rrd_path(&temp_dir, "overwrite_test"); + + let mut backend = RrdDirectBackend::new(); + let schema = RrdSchema::storage(RrdFormat::Pve2); + let start_time = 1704067200; + + // Create file first time + backend + .create(&rrd_path, &schema, start_time) + .await + .expect("First create failed"); + + // Create same file again - should succeed (overwrites) + // Note: librrd create() with no_overwrite=false allows overwriting + let result = backend.create(&rrd_path, &schema, start_time).await; + assert!( + result.is_ok(), + "Creating file again should succeed (overwrite mode): {:?}", + result.err() + ); + } + + #[tokio::test] + async fn test_direct_backend_large_schema() { + let temp_dir = setup_temp_dir(); + let rrd_path = test_rrd_path(&temp_dir, "large_schema_test"); + + let mut backend = RrdDirectBackend::new(); + let schema = RrdSchema::node(RrdFormat::Pve9_0); // 19 data sources + let start_time = 1704067200; + + // Create RRD with large schema + let result = backend.create(&rrd_path, &schema, start_time).await; + assert!(result.is_ok(), "Failed to create RRD with large schema"); + + // Update with all values + let values = "100:200:50.5:10.2:8000000:4000000:2000000:500000:50000000:25000000:1000000:2000000:6000000:1000000:0.5:1.2:0.8:0.3:0.1"; + let update_data = format!("N:{}", values); + + let result = backend.update(&rrd_path, &update_data).await; + assert!(result.is_ok(), "Failed to update RRD with large schema"); + } +} diff --git a/src/pmxcfs-rs/pmxcfs-rrd/src/backend/backend_fallback.rs b/src/pmxcfs-rs/pmxcfs-rrd/src/backend/backend_fallback.rs new file mode 100644 index 00000000..7d574e5b --- /dev/null +++ b/src/pmxcfs-rs/pmxcfs-rrd/src/backend/backend_fallback.rs @@ -0,0 +1,229 @@ +/// RRD Backend: Fallback (Daemon + Direct) +/// +/// Composite backend that tries daemon first, falls back to direct file writing. +/// This matches the C implementation's behavior in status.c:1405-1420 where +/// it attempts rrdc_update() first, then falls back to rrd_update_r(). +use super::super::schema::RrdSchema; +use super::{RrdCachedBackend, RrdDirectBackend}; +use anyhow::{Context, Result}; +use async_trait::async_trait; +use std::path::Path; + +/// Composite backend that tries daemon first, falls back to direct +/// +/// This provides the same behavior as the C implementation: +/// 1. Try to use rrdcached daemon for performance +/// 2. If daemon fails or is unavailable, fall back to direct file writes +pub struct RrdFallbackBackend { + /// Optional daemon backend (None if daemon is unavailable/failed) + daemon: Option<RrdCachedBackend>, + /// Direct backend (always available) + direct: RrdDirectBackend, +} + +impl RrdFallbackBackend { + /// Create a new fallback backend + /// + /// Attempts to connect to rrdcached daemon. If successful, will prefer daemon. + /// If daemon is unavailable, will use direct mode only. + /// + /// # Arguments + /// * `daemon_socket` - Path to rrdcached Unix socket + pub async fn new(daemon_socket: &str) -> Self { + let daemon = match RrdCachedBackend::connect(daemon_socket).await { + Ok(backend) => { + tracing::info!("RRD fallback backend: daemon available, will prefer daemon mode"); + Some(backend) + } + Err(e) => { + tracing::warn!( + "RRD fallback backend: daemon unavailable ({}), using direct mode only", + e + ); + None + } + }; + + let direct = RrdDirectBackend::new(); + + Self { daemon, direct } + } + + /// Create a fallback backend with explicit daemon and direct backends + /// + /// Useful for testing or custom configurations + #[allow(dead_code)] // Used in tests for custom backend configurations + pub fn with_backends(daemon: Option<RrdCachedBackend>, direct: RrdDirectBackend) -> Self { + Self { daemon, direct } + } + + /// Check if daemon is currently being used + #[allow(dead_code)] // Used for debugging/monitoring daemon status + pub fn is_using_daemon(&self) -> bool { + self.daemon.is_some() + } + + /// Disable daemon mode and switch to direct mode only + /// + /// Called automatically when daemon operations fail + fn disable_daemon(&mut self) { + if self.daemon.is_some() { + tracing::warn!("Disabling daemon mode, switching to direct file writes"); + self.daemon = None; + } + } +} + +#[async_trait] +impl super::super::backend::RrdBackend for RrdFallbackBackend { + async fn update(&mut self, file_path: &Path, data: &str) -> Result<()> { + // Try daemon first if available + if let Some(daemon) = &mut self.daemon { + match daemon.update(file_path, data).await { + Ok(()) => { + tracing::trace!("Updated RRD via daemon (fallback backend)"); + return Ok(()); + } + Err(e) => { + tracing::warn!("Daemon update failed, falling back to direct: {}", e); + self.disable_daemon(); + } + } + } + + // Fallback to direct + self.direct + .update(file_path, data) + .await + .context("Both daemon and direct update failed") + } + + async fn create( + &mut self, + file_path: &Path, + schema: &RrdSchema, + start_timestamp: i64, + ) -> Result<()> { + // Try daemon first if available + if let Some(daemon) = &mut self.daemon { + match daemon.create(file_path, schema, start_timestamp).await { + Ok(()) => { + tracing::trace!("Created RRD via daemon (fallback backend)"); + return Ok(()); + } + Err(e) => { + tracing::warn!("Daemon create failed, falling back to direct: {}", e); + self.disable_daemon(); + } + } + } + + // Fallback to direct + self.direct + .create(file_path, schema, start_timestamp) + .await + .context("Both daemon and direct create failed") + } + + async fn flush(&mut self) -> Result<()> { + // Only flush if using daemon + if let Some(daemon) = &mut self.daemon { + match daemon.flush().await { + Ok(()) => return Ok(()), + Err(e) => { + tracing::warn!("Daemon flush failed: {}", e); + self.disable_daemon(); + } + } + } + + // Direct backend flush is a no-op + self.direct.flush().await + } + + async fn is_available(&self) -> bool { + // Always available - either daemon or direct will work + true + } + + fn name(&self) -> &str { + if self.daemon.is_some() { + "fallback(daemon+direct)" + } else { + "fallback(direct-only)" + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::backend::RrdBackend; + use crate::schema::{RrdFormat, RrdSchema}; + use std::path::PathBuf; + use tempfile::TempDir; + + /// Create a temporary directory for RRD files + fn setup_temp_dir() -> TempDir { + TempDir::new().expect("Failed to create temp directory") + } + + /// Create a test RRD file path + fn test_rrd_path(dir: &TempDir, name: &str) -> PathBuf { + dir.path().join(format!("{}.rrd", name)) + } + + #[test] + fn test_fallback_backend_without_daemon() { + let direct = RrdDirectBackend::new(); + let backend = RrdFallbackBackend::with_backends(None, direct); + + assert!(!backend.is_using_daemon()); + assert_eq!(backend.name(), "fallback(direct-only)"); + } + + #[tokio::test] + async fn test_fallback_backend_direct_mode_operations() { + let temp_dir = setup_temp_dir(); + let rrd_path = test_rrd_path(&temp_dir, "fallback_test"); + + // Create fallback backend without daemon (direct mode only) + let direct = RrdDirectBackend::new(); + let mut backend = RrdFallbackBackend::with_backends(None, direct); + + assert!(!backend.is_using_daemon(), "Should not be using daemon"); + assert_eq!(backend.name(), "fallback(direct-only)"); + + // Test create and update operations work in direct mode + let schema = RrdSchema::storage(RrdFormat::Pve2); + let start_time = 1704067200; + + let result = backend.create(&rrd_path, &schema, start_time).await; + assert!(result.is_ok(), "Create should work in direct mode"); + + let result = backend.update(&rrd_path, "N:1000:500").await; + assert!(result.is_ok(), "Update should work in direct mode"); + } + + #[tokio::test] + async fn test_fallback_backend_is_always_available() { + let direct = RrdDirectBackend::new(); + let backend = RrdFallbackBackend::with_backends(None, direct); + + // Fallback backend should always be available (even without daemon) + assert!( + backend.is_available().await, + "Fallback backend should always be available" + ); + } + + #[tokio::test] + async fn test_fallback_backend_flush_without_daemon() { + let direct = RrdDirectBackend::new(); + let mut backend = RrdFallbackBackend::with_backends(None, direct); + + // Flush should succeed even without daemon (no-op for direct) + let result = backend.flush().await; + assert!(result.is_ok(), "Flush should succeed without daemon"); + } +} diff --git a/src/pmxcfs-rs/pmxcfs-rrd/src/daemon.rs b/src/pmxcfs-rs/pmxcfs-rrd/src/daemon.rs new file mode 100644 index 00000000..e53b6dad --- /dev/null +++ b/src/pmxcfs-rs/pmxcfs-rrd/src/daemon.rs @@ -0,0 +1,140 @@ +/// RRDCached Daemon Client (wrapper around rrdcached-client crate) +/// +/// This module provides a thin wrapper around the rrdcached-client crate. +use anyhow::{Context, Result}; +use std::path::Path; + +/// Wrapper around rrdcached-client +#[allow(dead_code)] // Used in backend_daemon.rs via module-level access +pub struct RrdCachedClient { + pub(crate) client: + tokio::sync::Mutex<rrdcached_client::RRDCachedClient<tokio::net::UnixStream>>, +} + +impl RrdCachedClient { + /// Connect to rrdcached daemon via Unix socket + /// + /// # Arguments + /// * `socket_path` - Path to rrdcached Unix socket (default: /var/run/rrdcached.sock) + #[allow(dead_code)] // Used via backend modules + pub async fn connect<P: AsRef<Path>>(socket_path: P) -> Result<Self> { + let socket_path = socket_path.as_ref().to_string_lossy().to_string(); + + tracing::debug!("Connecting to rrdcached at {}", socket_path); + + // Connect to daemon (async operation) + let client = rrdcached_client::RRDCachedClient::connect_unix(&socket_path) + .await + .with_context(|| format!("Failed to connect to rrdcached: {socket_path}"))?; + + tracing::info!("Connected to rrdcached at {}", socket_path); + + Ok(Self { + client: tokio::sync::Mutex::new(client), + }) + } + + /// Update RRD file via rrdcached + /// + /// # Arguments + /// * `file_path` - Full path to RRD file + /// * `data` - Update data in format "timestamp:value1:value2:..." + #[allow(dead_code)] // Used via backend modules + pub async fn update<P: AsRef<Path>>(&self, file_path: P, data: &str) -> Result<()> { + let file_path = file_path.as_ref(); + + // Parse the update data + let parts: Vec<&str> = data.split(':').collect(); + if parts.len() < 2 { + anyhow::bail!("Invalid update data format: {data}"); + } + + let timestamp = if parts[0] == "N" { + None + } else { + Some( + parts[0] + .parse::<usize>() + .with_context(|| format!("Invalid timestamp: {}", parts[0]))?, + ) + }; + + let values: Vec<f64> = parts[1..] + .iter() + .map(|v| { + if *v == "U" { + Ok(f64::NAN) + } else { + v.parse::<f64>() + .with_context(|| format!("Invalid value: {v}")) + } + }) + .collect::<Result<Vec<_>>>()?; + + // Get file path without .rrd extension (rrdcached-client adds it) + let path_str = file_path.to_string_lossy(); + let path_without_ext = path_str.strip_suffix(".rrd").unwrap_or(&path_str); + + // Send update via rrdcached + let mut client = self.client.lock().await; + client + .update(path_without_ext, timestamp, values) + .await + .context("Failed to send update to rrdcached")?; + + tracing::trace!("Updated RRD via daemon: {:?} -> {}", file_path, data); + + Ok(()) + } + + /// Create RRD file via rrdcached + #[allow(dead_code)] // Used via backend modules + pub async fn create(&self, args: rrdcached_client::create::CreateArguments) -> Result<()> { + let mut client = self.client.lock().await; + client + .create(args) + .await + .context("Failed to create RRD via rrdcached")?; + Ok(()) + } + + /// Flush all pending updates + #[allow(dead_code)] // Used via backend modules + pub async fn flush(&self) -> Result<()> { + let mut client = self.client.lock().await; + client + .flush_all() + .await + .context("Failed to flush rrdcached")?; + + tracing::debug!("Flushed all RRD files"); + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + #[ignore] // Only runs if rrdcached daemon is actually running + async fn test_connect_to_daemon() { + // This test requires a running rrdcached daemon + let result = RrdCachedClient::connect("/var/run/rrdcached.sock").await; + + match result { + Ok(client) => { + // Try to flush (basic connectivity test) + let result = client.flush().await; + println!("RRDCached flush result: {:?}", result); + + // Connection successful (flush may fail if no files, that's OK) + assert!(result.is_ok() || result.is_err()); + } + Err(e) => { + println!("Note: rrdcached not running (expected in test env): {}", e); + } + } + } +} diff --git a/src/pmxcfs-rs/pmxcfs-rrd/src/key_type.rs b/src/pmxcfs-rs/pmxcfs-rrd/src/key_type.rs new file mode 100644 index 00000000..54021c14 --- /dev/null +++ b/src/pmxcfs-rs/pmxcfs-rrd/src/key_type.rs @@ -0,0 +1,313 @@ +/// RRD Key Type Parsing and Path Resolution +/// +/// This module handles parsing RRD status update keys and mapping them +/// to the appropriate file paths and schemas. +use anyhow::{Context, Result}; +use std::path::{Path, PathBuf}; + +use super::schema::{RrdFormat, RrdSchema}; + +/// RRD key types for routing to correct schema and path +/// +/// This enum represents the different types of RRD metrics that pmxcfs tracks: +/// - Node metrics (CPU, memory, network for a node) +/// - VM metrics (CPU, memory, disk, network for a VM/CT) +/// - Storage metrics (total/used space for a storage) +#[derive(Debug, Clone, PartialEq, Eq)] +pub(crate) enum RrdKeyType { + /// Node metrics: pve2-node/{nodename} or pve-node-9.0/{nodename} + Node { nodename: String, format: RrdFormat }, + /// VM metrics: pve2.3-vm/{vmid} or pve-vm-9.0/{vmid} + Vm { vmid: String, format: RrdFormat }, + /// Storage metrics: pve2-storage/{node}/{storage} or pve-storage-9.0/{node}/{storage} + Storage { + nodename: String, + storage: String, + format: RrdFormat, + }, +} + +impl RrdKeyType { + /// Parse RRD key from status update key + /// + /// Supported formats: + /// - "pve2-node/node1" → Node { nodename: "node1", format: Pve2 } + /// - "pve-node-9.0/node1" → Node { nodename: "node1", format: Pve9_0 } + /// - "pve2.3-vm/100" → Vm { vmid: "100", format: Pve2 } + /// - "pve-storage-9.0/node1/local" → Storage { nodename: "node1", storage: "local", format: Pve9_0 } + pub(crate) fn parse(key: &str) -> Result<Self> { + let parts: Vec<&str> = key.split('/').collect(); + + if parts.is_empty() { + anyhow::bail!("Empty RRD key"); + } + + match parts[0] { + "pve2-node" => { + let nodename = parts.get(1).context("Missing nodename")?.to_string(); + Ok(RrdKeyType::Node { + nodename, + format: RrdFormat::Pve2, + }) + } + prefix if prefix.starts_with("pve-node-") => { + let nodename = parts.get(1).context("Missing nodename")?.to_string(); + Ok(RrdKeyType::Node { + nodename, + format: RrdFormat::Pve9_0, + }) + } + "pve2.3-vm" => { + let vmid = parts.get(1).context("Missing vmid")?.to_string(); + Ok(RrdKeyType::Vm { + vmid, + format: RrdFormat::Pve2, + }) + } + prefix if prefix.starts_with("pve-vm-") => { + let vmid = parts.get(1).context("Missing vmid")?.to_string(); + Ok(RrdKeyType::Vm { + vmid, + format: RrdFormat::Pve9_0, + }) + } + "pve2-storage" => { + let nodename = parts.get(1).context("Missing nodename")?.to_string(); + let storage = parts.get(2).context("Missing storage")?.to_string(); + Ok(RrdKeyType::Storage { + nodename, + storage, + format: RrdFormat::Pve2, + }) + } + prefix if prefix.starts_with("pve-storage-") => { + let nodename = parts.get(1).context("Missing nodename")?.to_string(); + let storage = parts.get(2).context("Missing storage")?.to_string(); + Ok(RrdKeyType::Storage { + nodename, + storage, + format: RrdFormat::Pve9_0, + }) + } + _ => anyhow::bail!("Unknown RRD key format: {key}"), + } + } + + /// Get the RRD file path for this key type + /// + /// Always returns paths using the current format (9.0), regardless of the input format. + /// This enables transparent format migration: old PVE8 nodes can send `pve2-node/` keys, + /// and they'll be written to `pve-node-9.0/` files automatically. + /// + /// # Format Migration Strategy + /// + /// The C implementation always creates files in the current format directory + /// (see status.c:1287). This Rust implementation follows the same approach: + /// - Input: `pve2-node/node1` → Output: `/var/lib/rrdcached/db/pve-node-9.0/node1` + /// - Input: `pve-node-9.0/node1` → Output: `/var/lib/rrdcached/db/pve-node-9.0/node1` + /// + /// This allows rolling upgrades where old and new nodes coexist in the same cluster. + pub(crate) fn file_path(&self, base_dir: &Path) -> PathBuf { + match self { + RrdKeyType::Node { nodename, .. } => { + // Always use current format path + base_dir.join("pve-node-9.0").join(nodename) + } + RrdKeyType::Vm { vmid, .. } => { + // Always use current format path + base_dir.join("pve-vm-9.0").join(vmid) + } + RrdKeyType::Storage { + nodename, storage, .. + } => { + // Always use current format path + base_dir + .join("pve-storage-9.0") + .join(nodename) + .join(storage) + } + } + } + + /// Get the source format from the input key + /// + /// This is used for data transformation (padding/truncation). + pub(crate) fn source_format(&self) -> RrdFormat { + match self { + RrdKeyType::Node { format, .. } + | RrdKeyType::Vm { format, .. } + | RrdKeyType::Storage { format, .. } => *format, + } + } + + /// Get the target RRD schema (always current format) + /// + /// Files are always created using the current format (Pve9_0), + /// regardless of the source format in the key. + pub(crate) fn schema(&self) -> RrdSchema { + match self { + RrdKeyType::Node { .. } => RrdSchema::node(RrdFormat::Pve9_0), + RrdKeyType::Vm { .. } => RrdSchema::vm(RrdFormat::Pve9_0), + RrdKeyType::Storage { .. } => RrdSchema::storage(RrdFormat::Pve9_0), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_parse_node_keys() { + let key = RrdKeyType::parse("pve2-node/testnode").unwrap(); + assert_eq!( + key, + RrdKeyType::Node { + nodename: "testnode".to_string(), + format: RrdFormat::Pve2 + } + ); + + let key = RrdKeyType::parse("pve-node-9.0/testnode").unwrap(); + assert_eq!( + key, + RrdKeyType::Node { + nodename: "testnode".to_string(), + format: RrdFormat::Pve9_0 + } + ); + } + + #[test] + fn test_parse_vm_keys() { + let key = RrdKeyType::parse("pve2.3-vm/100").unwrap(); + assert_eq!( + key, + RrdKeyType::Vm { + vmid: "100".to_string(), + format: RrdFormat::Pve2 + } + ); + + let key = RrdKeyType::parse("pve-vm-9.0/100").unwrap(); + assert_eq!( + key, + RrdKeyType::Vm { + vmid: "100".to_string(), + format: RrdFormat::Pve9_0 + } + ); + } + + #[test] + fn test_parse_storage_keys() { + let key = RrdKeyType::parse("pve2-storage/node1/local").unwrap(); + assert_eq!( + key, + RrdKeyType::Storage { + nodename: "node1".to_string(), + storage: "local".to_string(), + format: RrdFormat::Pve2 + } + ); + + let key = RrdKeyType::parse("pve-storage-9.0/node1/local").unwrap(); + assert_eq!( + key, + RrdKeyType::Storage { + nodename: "node1".to_string(), + storage: "local".to_string(), + format: RrdFormat::Pve9_0 + } + ); + } + + #[test] + fn test_file_paths() { + let base = Path::new("/var/lib/rrdcached/db"); + + // New format key → new format path + let key = RrdKeyType::Node { + nodename: "node1".to_string(), + format: RrdFormat::Pve9_0, + }; + assert_eq!( + key.file_path(base), + PathBuf::from("/var/lib/rrdcached/db/pve-node-9.0/node1") + ); + + // Old format key → new format path (auto-upgrade!) + let key = RrdKeyType::Node { + nodename: "node1".to_string(), + format: RrdFormat::Pve2, + }; + assert_eq!( + key.file_path(base), + PathBuf::from("/var/lib/rrdcached/db/pve-node-9.0/node1"), + "Old format keys should create new format files" + ); + + // VM: Old format → new format + let key = RrdKeyType::Vm { + vmid: "100".to_string(), + format: RrdFormat::Pve2, + }; + assert_eq!( + key.file_path(base), + PathBuf::from("/var/lib/rrdcached/db/pve-vm-9.0/100"), + "Old VM format should upgrade to new format" + ); + + // Storage: Always uses current format + let key = RrdKeyType::Storage { + nodename: "node1".to_string(), + storage: "local".to_string(), + format: RrdFormat::Pve2, + }; + assert_eq!( + key.file_path(base), + PathBuf::from("/var/lib/rrdcached/db/pve-storage-9.0/node1/local"), + "Old storage format should upgrade to new format" + ); + } + + #[test] + fn test_source_format() { + let key = RrdKeyType::Node { + nodename: "node1".to_string(), + format: RrdFormat::Pve2, + }; + assert_eq!(key.source_format(), RrdFormat::Pve2); + + let key = RrdKeyType::Vm { + vmid: "100".to_string(), + format: RrdFormat::Pve9_0, + }; + assert_eq!(key.source_format(), RrdFormat::Pve9_0); + } + + #[test] + fn test_schema_always_current_format() { + // Even with Pve2 source format, schema should return Pve9_0 + let key = RrdKeyType::Node { + nodename: "node1".to_string(), + format: RrdFormat::Pve2, + }; + let schema = key.schema(); + assert_eq!( + schema.format, + RrdFormat::Pve9_0, + "Schema should always use current format" + ); + assert_eq!(schema.column_count(), 19, "Should have Pve9_0 column count"); + + // Pve9_0 source also gets Pve9_0 schema + let key = RrdKeyType::Node { + nodename: "node1".to_string(), + format: RrdFormat::Pve9_0, + }; + let schema = key.schema(); + assert_eq!(schema.format, RrdFormat::Pve9_0); + assert_eq!(schema.column_count(), 19); + } +} diff --git a/src/pmxcfs-rs/pmxcfs-rrd/src/lib.rs b/src/pmxcfs-rs/pmxcfs-rrd/src/lib.rs new file mode 100644 index 00000000..7a439676 --- /dev/null +++ b/src/pmxcfs-rs/pmxcfs-rrd/src/lib.rs @@ -0,0 +1,21 @@ +/// RRD (Round-Robin Database) Persistence Module +/// +/// This module provides RRD file persistence compatible with the C pmxcfs implementation. +/// It handles: +/// - RRD file creation with proper schemas (node, VM, storage) +/// - RRD file updates (writing metrics to disk) +/// - Multiple backend strategies: +/// - Daemon mode: High-performance batched updates via rrdcached +/// - Direct mode: Reliable fallback using direct file writes +/// - Fallback mode: Tries daemon first, falls back to direct (matches C behavior) +/// - Version management (pve2 vs pve-9.0 formats) +/// +/// The implementation matches the C behavior in status.c where it attempts +/// daemon updates first, then falls back to direct file operations. +mod backend; +mod daemon; +mod key_type; +pub(crate) mod schema; +mod writer; + +pub use writer::RrdWriter; diff --git a/src/pmxcfs-rs/pmxcfs-rrd/src/schema.rs b/src/pmxcfs-rs/pmxcfs-rrd/src/schema.rs new file mode 100644 index 00000000..d449bd6e --- /dev/null +++ b/src/pmxcfs-rs/pmxcfs-rrd/src/schema.rs @@ -0,0 +1,577 @@ +/// RRD Schema Definitions +/// +/// Defines RRD database schemas matching the C pmxcfs implementation. +/// Each schema specifies data sources (DS) and round-robin archives (RRA). +use std::fmt; + +/// RRD format version +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum RrdFormat { + /// Legacy pve2 format (12 columns for node, 10 for VM, 2 for storage) + Pve2, + /// New pve-9.0 format (19 columns for node, 17 for VM, 2 for storage) + Pve9_0, +} + +/// RRD data source definition +#[derive(Debug, Clone)] +pub struct RrdDataSource { + /// Data source name + pub name: &'static str, + /// Data source type (GAUGE, COUNTER, DERIVE, ABSOLUTE) + pub ds_type: &'static str, + /// Heartbeat (seconds before marking as unknown) + pub heartbeat: u32, + /// Minimum value (U for unknown) + pub min: &'static str, + /// Maximum value (U for unknown) + pub max: &'static str, +} + +impl RrdDataSource { + /// Create GAUGE data source with no min/max limits + pub(super) const fn gauge(name: &'static str) -> Self { + Self { + name, + ds_type: "GAUGE", + heartbeat: 120, + min: "0", + max: "U", + } + } + + /// Create DERIVE data source (for counters that can wrap) + pub(super) const fn derive(name: &'static str) -> Self { + Self { + name, + ds_type: "DERIVE", + heartbeat: 120, + min: "0", + max: "U", + } + } + + /// Format as RRD command line argument + /// + /// Matches C implementation format: "DS:name:TYPE:heartbeat:min:max" + /// (see rrd_def_node in src/pmxcfs/status.c:1100) + /// + /// Currently unused but kept for debugging/testing and C format compatibility. + #[allow(dead_code)] + pub(super) fn to_arg(&self) -> String { + format!( + "DS:{}:{}:{}:{}:{}", + self.name, self.ds_type, self.heartbeat, self.min, self.max + ) + } +} + +/// RRD schema with data sources and archives +#[derive(Debug, Clone)] +pub struct RrdSchema { + /// RRD format version + pub format: RrdFormat, + /// Data sources + pub data_sources: Vec<RrdDataSource>, + /// Round-robin archives (RRA definitions) + pub archives: Vec<String>, +} + +impl RrdSchema { + /// Create node RRD schema + pub fn node(format: RrdFormat) -> Self { + let data_sources = match format { + RrdFormat::Pve2 => vec![ + RrdDataSource::gauge("loadavg"), + RrdDataSource::gauge("maxcpu"), + RrdDataSource::gauge("cpu"), + RrdDataSource::gauge("iowait"), + RrdDataSource::gauge("memtotal"), + RrdDataSource::gauge("memused"), + RrdDataSource::gauge("swaptotal"), + RrdDataSource::gauge("swapused"), + RrdDataSource::gauge("roottotal"), + RrdDataSource::gauge("rootused"), + RrdDataSource::derive("netin"), + RrdDataSource::derive("netout"), + ], + RrdFormat::Pve9_0 => vec![ + RrdDataSource::gauge("loadavg"), + RrdDataSource::gauge("maxcpu"), + RrdDataSource::gauge("cpu"), + RrdDataSource::gauge("iowait"), + RrdDataSource::gauge("memtotal"), + RrdDataSource::gauge("memused"), + RrdDataSource::gauge("swaptotal"), + RrdDataSource::gauge("swapused"), + RrdDataSource::gauge("roottotal"), + RrdDataSource::gauge("rootused"), + RrdDataSource::derive("netin"), + RrdDataSource::derive("netout"), + RrdDataSource::gauge("memavailable"), + RrdDataSource::gauge("arcsize"), + RrdDataSource::gauge("pressurecpusome"), + RrdDataSource::gauge("pressureiosome"), + RrdDataSource::gauge("pressureiofull"), + RrdDataSource::gauge("pressurememorysome"), + RrdDataSource::gauge("pressurememoryfull"), + ], + }; + + Self { + format, + data_sources, + archives: Self::default_archives(), + } + } + + /// Create VM RRD schema + pub fn vm(format: RrdFormat) -> Self { + let data_sources = match format { + RrdFormat::Pve2 => vec![ + RrdDataSource::gauge("maxcpu"), + RrdDataSource::gauge("cpu"), + RrdDataSource::gauge("maxmem"), + RrdDataSource::gauge("mem"), + RrdDataSource::gauge("maxdisk"), + RrdDataSource::gauge("disk"), + RrdDataSource::derive("netin"), + RrdDataSource::derive("netout"), + RrdDataSource::derive("diskread"), + RrdDataSource::derive("diskwrite"), + ], + RrdFormat::Pve9_0 => vec![ + RrdDataSource::gauge("maxcpu"), + RrdDataSource::gauge("cpu"), + RrdDataSource::gauge("maxmem"), + RrdDataSource::gauge("mem"), + RrdDataSource::gauge("maxdisk"), + RrdDataSource::gauge("disk"), + RrdDataSource::derive("netin"), + RrdDataSource::derive("netout"), + RrdDataSource::derive("diskread"), + RrdDataSource::derive("diskwrite"), + RrdDataSource::gauge("memhost"), + RrdDataSource::gauge("pressurecpusome"), + RrdDataSource::gauge("pressurecpufull"), + RrdDataSource::gauge("pressureiosome"), + RrdDataSource::gauge("pressureiofull"), + RrdDataSource::gauge("pressurememorysome"), + RrdDataSource::gauge("pressurememoryfull"), + ], + }; + + Self { + format, + data_sources, + archives: Self::default_archives(), + } + } + + /// Create storage RRD schema + pub fn storage(format: RrdFormat) -> Self { + let data_sources = vec![RrdDataSource::gauge("total"), RrdDataSource::gauge("used")]; + + Self { + format, + data_sources, + archives: Self::default_archives(), + } + } + + /// Default RRA (Round-Robin Archive) definitions + /// + /// These match the C implementation's archives for 60-second step size: + /// - RRA:AVERAGE:0.5:1:1440 -> 1 min * 1440 => 1 day + /// - RRA:AVERAGE:0.5:30:1440 -> 30 min * 1440 => 30 days + /// - RRA:AVERAGE:0.5:360:1440 -> 6 hours * 1440 => 360 days (~1 year) + /// - RRA:AVERAGE:0.5:10080:570 -> 1 week * 570 => ~10 years + /// - RRA:MAX:0.5:1:1440 -> 1 min * 1440 => 1 day + /// - RRA:MAX:0.5:30:1440 -> 30 min * 1440 => 30 days + /// - RRA:MAX:0.5:360:1440 -> 6 hours * 1440 => 360 days (~1 year) + /// - RRA:MAX:0.5:10080:570 -> 1 week * 570 => ~10 years + pub(super) fn default_archives() -> Vec<String> { + vec![ + "RRA:AVERAGE:0.5:1:1440".to_string(), + "RRA:AVERAGE:0.5:30:1440".to_string(), + "RRA:AVERAGE:0.5:360:1440".to_string(), + "RRA:AVERAGE:0.5:10080:570".to_string(), + "RRA:MAX:0.5:1:1440".to_string(), + "RRA:MAX:0.5:30:1440".to_string(), + "RRA:MAX:0.5:360:1440".to_string(), + "RRA:MAX:0.5:10080:570".to_string(), + ] + } + + /// Get number of data sources + pub fn column_count(&self) -> usize { + self.data_sources.len() + } +} + +impl fmt::Display for RrdSchema { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + f, + "{:?} schema with {} data sources", + self.format, + self.column_count() + ) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn assert_ds_properties( + ds: &RrdDataSource, + expected_name: &str, + expected_type: &str, + index: usize, + ) { + assert_eq!(ds.name, expected_name, "DS[{}] name mismatch", index); + assert_eq!(ds.ds_type, expected_type, "DS[{}] type mismatch", index); + assert_eq!(ds.heartbeat, 120, "DS[{}] heartbeat should be 120", index); + assert_eq!(ds.min, "0", "DS[{}] min should be 0", index); + assert_eq!(ds.max, "U", "DS[{}] max should be U", index); + } + + #[test] + fn test_datasource_construction() { + let gauge_ds = RrdDataSource::gauge("cpu"); + assert_eq!(gauge_ds.name, "cpu"); + assert_eq!(gauge_ds.ds_type, "GAUGE"); + assert_eq!(gauge_ds.heartbeat, 120); + assert_eq!(gauge_ds.min, "0"); + assert_eq!(gauge_ds.max, "U"); + assert_eq!(gauge_ds.to_arg(), "DS:cpu:GAUGE:120:0:U"); + + let derive_ds = RrdDataSource::derive("netin"); + assert_eq!(derive_ds.name, "netin"); + assert_eq!(derive_ds.ds_type, "DERIVE"); + assert_eq!(derive_ds.heartbeat, 120); + assert_eq!(derive_ds.min, "0"); + assert_eq!(derive_ds.max, "U"); + assert_eq!(derive_ds.to_arg(), "DS:netin:DERIVE:120:0:U"); + } + + #[test] + fn test_node_schema_pve2() { + let schema = RrdSchema::node(RrdFormat::Pve2); + + assert_eq!(schema.column_count(), 12); + assert_eq!(schema.format, RrdFormat::Pve2); + + let expected_ds = vec![ + ("loadavg", "GAUGE"), + ("maxcpu", "GAUGE"), + ("cpu", "GAUGE"), + ("iowait", "GAUGE"), + ("memtotal", "GAUGE"), + ("memused", "GAUGE"), + ("swaptotal", "GAUGE"), + ("swapused", "GAUGE"), + ("roottotal", "GAUGE"), + ("rootused", "GAUGE"), + ("netin", "DERIVE"), + ("netout", "DERIVE"), + ]; + + for (i, (name, ds_type)) in expected_ds.iter().enumerate() { + assert_ds_properties(&schema.data_sources[i], name, ds_type, i); + } + } + + #[test] + fn test_node_schema_pve9() { + let schema = RrdSchema::node(RrdFormat::Pve9_0); + + assert_eq!(schema.column_count(), 19); + assert_eq!(schema.format, RrdFormat::Pve9_0); + + let pve2_schema = RrdSchema::node(RrdFormat::Pve2); + for i in 0..12 { + assert_eq!( + schema.data_sources[i].name, pve2_schema.data_sources[i].name, + "First 12 DS should match pve2" + ); + assert_eq!( + schema.data_sources[i].ds_type, pve2_schema.data_sources[i].ds_type, + "First 12 DS types should match pve2" + ); + } + + let pve9_additions = vec![ + ("memavailable", "GAUGE"), + ("arcsize", "GAUGE"), + ("pressurecpusome", "GAUGE"), + ("pressureiosome", "GAUGE"), + ("pressureiofull", "GAUGE"), + ("pressurememorysome", "GAUGE"), + ("pressurememoryfull", "GAUGE"), + ]; + + for (i, (name, ds_type)) in pve9_additions.iter().enumerate() { + assert_ds_properties(&schema.data_sources[12 + i], name, ds_type, 12 + i); + } + } + + #[test] + fn test_vm_schema_pve2() { + let schema = RrdSchema::vm(RrdFormat::Pve2); + + assert_eq!(schema.column_count(), 10); + assert_eq!(schema.format, RrdFormat::Pve2); + + let expected_ds = vec![ + ("maxcpu", "GAUGE"), + ("cpu", "GAUGE"), + ("maxmem", "GAUGE"), + ("mem", "GAUGE"), + ("maxdisk", "GAUGE"), + ("disk", "GAUGE"), + ("netin", "DERIVE"), + ("netout", "DERIVE"), + ("diskread", "DERIVE"), + ("diskwrite", "DERIVE"), + ]; + + for (i, (name, ds_type)) in expected_ds.iter().enumerate() { + assert_ds_properties(&schema.data_sources[i], name, ds_type, i); + } + } + + #[test] + fn test_vm_schema_pve9() { + let schema = RrdSchema::vm(RrdFormat::Pve9_0); + + assert_eq!(schema.column_count(), 17); + assert_eq!(schema.format, RrdFormat::Pve9_0); + + let pve2_schema = RrdSchema::vm(RrdFormat::Pve2); + for i in 0..10 { + assert_eq!( + schema.data_sources[i].name, pve2_schema.data_sources[i].name, + "First 10 DS should match pve2" + ); + assert_eq!( + schema.data_sources[i].ds_type, pve2_schema.data_sources[i].ds_type, + "First 10 DS types should match pve2" + ); + } + + let pve9_additions = vec![ + ("memhost", "GAUGE"), + ("pressurecpusome", "GAUGE"), + ("pressurecpufull", "GAUGE"), + ("pressureiosome", "GAUGE"), + ("pressureiofull", "GAUGE"), + ("pressurememorysome", "GAUGE"), + ("pressurememoryfull", "GAUGE"), + ]; + + for (i, (name, ds_type)) in pve9_additions.iter().enumerate() { + assert_ds_properties(&schema.data_sources[10 + i], name, ds_type, 10 + i); + } + } + + #[test] + fn test_storage_schema() { + for format in [RrdFormat::Pve2, RrdFormat::Pve9_0] { + let schema = RrdSchema::storage(format); + + assert_eq!(schema.column_count(), 2); + assert_eq!(schema.format, format); + + assert_ds_properties(&schema.data_sources[0], "total", "GAUGE", 0); + assert_ds_properties(&schema.data_sources[1], "used", "GAUGE", 1); + } + } + + #[test] + fn test_rra_archives() { + let expected_rras = [ + "RRA:AVERAGE:0.5:1:1440", + "RRA:AVERAGE:0.5:30:1440", + "RRA:AVERAGE:0.5:360:1440", + "RRA:AVERAGE:0.5:10080:570", + "RRA:MAX:0.5:1:1440", + "RRA:MAX:0.5:30:1440", + "RRA:MAX:0.5:360:1440", + "RRA:MAX:0.5:10080:570", + ]; + + let schemas = vec![ + RrdSchema::node(RrdFormat::Pve2), + RrdSchema::node(RrdFormat::Pve9_0), + RrdSchema::vm(RrdFormat::Pve2), + RrdSchema::vm(RrdFormat::Pve9_0), + RrdSchema::storage(RrdFormat::Pve2), + RrdSchema::storage(RrdFormat::Pve9_0), + ]; + + for schema in schemas { + assert_eq!(schema.archives.len(), 8); + + for (i, expected) in expected_rras.iter().enumerate() { + assert_eq!( + &schema.archives[i], expected, + "RRA[{}] mismatch in {:?}", + i, schema.format + ); + } + } + } + + #[test] + fn test_heartbeat_consistency() { + let schemas = vec![ + RrdSchema::node(RrdFormat::Pve2), + RrdSchema::node(RrdFormat::Pve9_0), + RrdSchema::vm(RrdFormat::Pve2), + RrdSchema::vm(RrdFormat::Pve9_0), + RrdSchema::storage(RrdFormat::Pve2), + RrdSchema::storage(RrdFormat::Pve9_0), + ]; + + for schema in schemas { + for ds in &schema.data_sources { + assert_eq!(ds.heartbeat, 120); + assert_eq!(ds.min, "0"); + assert_eq!(ds.max, "U"); + } + } + } + + #[test] + fn test_gauge_vs_derive_correctness() { + // GAUGE: instantaneous values (CPU%, memory bytes) + // DERIVE: cumulative counters that can wrap (network/disk bytes) + + let node = RrdSchema::node(RrdFormat::Pve2); + let node_derive_indices = [10, 11]; // netin, netout + for (i, ds) in node.data_sources.iter().enumerate() { + if node_derive_indices.contains(&i) { + assert_eq!( + ds.ds_type, "DERIVE", + "Node DS[{}] ({}) should be DERIVE", + i, ds.name + ); + } else { + assert_eq!( + ds.ds_type, "GAUGE", + "Node DS[{}] ({}) should be GAUGE", + i, ds.name + ); + } + } + + let vm = RrdSchema::vm(RrdFormat::Pve2); + let vm_derive_indices = [6, 7, 8, 9]; // netin, netout, diskread, diskwrite + for (i, ds) in vm.data_sources.iter().enumerate() { + if vm_derive_indices.contains(&i) { + assert_eq!( + ds.ds_type, "DERIVE", + "VM DS[{}] ({}) should be DERIVE", + i, ds.name + ); + } else { + assert_eq!( + ds.ds_type, "GAUGE", + "VM DS[{}] ({}) should be GAUGE", + i, ds.name + ); + } + } + + let storage = RrdSchema::storage(RrdFormat::Pve2); + for ds in &storage.data_sources { + assert_eq!( + ds.ds_type, "GAUGE", + "Storage DS ({}) should be GAUGE", + ds.name + ); + } + } + + #[test] + fn test_pve9_backward_compatibility() { + let node_pve2 = RrdSchema::node(RrdFormat::Pve2); + let node_pve9 = RrdSchema::node(RrdFormat::Pve9_0); + + assert!(node_pve9.column_count() > node_pve2.column_count()); + + for i in 0..node_pve2.column_count() { + assert_eq!( + node_pve2.data_sources[i].name, node_pve9.data_sources[i].name, + "Node DS[{}] name must match between pve2 and pve9.0", + i + ); + assert_eq!( + node_pve2.data_sources[i].ds_type, node_pve9.data_sources[i].ds_type, + "Node DS[{}] type must match between pve2 and pve9.0", + i + ); + } + + let vm_pve2 = RrdSchema::vm(RrdFormat::Pve2); + let vm_pve9 = RrdSchema::vm(RrdFormat::Pve9_0); + + assert!(vm_pve9.column_count() > vm_pve2.column_count()); + + for i in 0..vm_pve2.column_count() { + assert_eq!( + vm_pve2.data_sources[i].name, vm_pve9.data_sources[i].name, + "VM DS[{}] name must match between pve2 and pve9.0", + i + ); + assert_eq!( + vm_pve2.data_sources[i].ds_type, vm_pve9.data_sources[i].ds_type, + "VM DS[{}] type must match between pve2 and pve9.0", + i + ); + } + + let storage_pve2 = RrdSchema::storage(RrdFormat::Pve2); + let storage_pve9 = RrdSchema::storage(RrdFormat::Pve9_0); + assert_eq!(storage_pve2.column_count(), storage_pve9.column_count()); + } + + #[test] + fn test_schema_display() { + let test_cases = vec![ + (RrdSchema::node(RrdFormat::Pve2), "Pve2", "12 data sources"), + ( + RrdSchema::node(RrdFormat::Pve9_0), + "Pve9_0", + "19 data sources", + ), + (RrdSchema::vm(RrdFormat::Pve2), "Pve2", "10 data sources"), + ( + RrdSchema::vm(RrdFormat::Pve9_0), + "Pve9_0", + "17 data sources", + ), + ( + RrdSchema::storage(RrdFormat::Pve2), + "Pve2", + "2 data sources", + ), + ]; + + for (schema, expected_format, expected_count) in test_cases { + let display = format!("{}", schema); + assert!( + display.contains(expected_format), + "Display should contain format: {}", + display + ); + assert!( + display.contains(expected_count), + "Display should contain count: {}", + display + ); + } + } +} diff --git a/src/pmxcfs-rs/pmxcfs-rrd/src/writer.rs b/src/pmxcfs-rs/pmxcfs-rrd/src/writer.rs new file mode 100644 index 00000000..79ed202a --- /dev/null +++ b/src/pmxcfs-rs/pmxcfs-rrd/src/writer.rs @@ -0,0 +1,397 @@ +/// RRD File Writer +/// +/// Handles creating and updating RRD files via pluggable backends. +/// Supports daemon-based (rrdcached) and direct file writing modes. +use super::key_type::RrdKeyType; +use super::schema::{RrdFormat, RrdSchema}; +use anyhow::{Context, Result}; +use chrono::Utc; +use std::collections::HashMap; +use std::fs; +use std::path::{Path, PathBuf}; + +/// Metric type for determining column skipping rules +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum MetricType { + Node, + Vm, + Storage, +} + +impl MetricType { + /// Number of non-archivable columns to skip + /// + /// C implementation (status.c:1300, 1335): + /// - Node: skip 2 (uptime, status) + /// - VM: skip 4 (uptime, status, template, pid) + /// - Storage: skip 0 + fn skip_columns(self) -> usize { + match self { + MetricType::Node => 2, + MetricType::Vm => 4, + MetricType::Storage => 0, + } + } +} + +impl RrdFormat { + /// Get column count for a specific metric type + #[allow(dead_code)] + fn column_count(self, metric_type: &MetricType) -> usize { + match (self, metric_type) { + (RrdFormat::Pve2, MetricType::Node) => 12, + (RrdFormat::Pve9_0, MetricType::Node) => 19, + (RrdFormat::Pve2, MetricType::Vm) => 10, + (RrdFormat::Pve9_0, MetricType::Vm) => 17, + (_, MetricType::Storage) => 2, // Same for both formats + } + } +} + +impl RrdKeyType { + /// Get the metric type for this key + fn metric_type(&self) -> MetricType { + match self { + RrdKeyType::Node { .. } => MetricType::Node, + RrdKeyType::Vm { .. } => MetricType::Vm, + RrdKeyType::Storage { .. } => MetricType::Storage, + } + } +} + +/// RRD writer for persistent metric storage +/// +/// Uses pluggable backends (daemon, direct, or fallback) for RRD operations. +pub struct RrdWriter { + /// Base directory for RRD files (default: /var/lib/rrdcached/db) + base_dir: PathBuf, + /// Backend for RRD operations (daemon, direct, or fallback) + backend: Box<dyn super::backend::RrdBackend>, + /// Track which RRD files we've already created + created_files: HashMap<String, ()>, +} + +impl RrdWriter { + /// Create new RRD writer with default fallback backend + /// + /// Uses the fallback backend that tries daemon first, then falls back to direct file writes. + /// This matches the C implementation's behavior. + /// + /// # Arguments + /// * `base_dir` - Base directory for RRD files + pub async fn new<P: AsRef<Path>>(base_dir: P) -> Result<Self> { + let backend = Self::default_backend().await?; + Self::with_backend(base_dir, backend).await + } + + /// Create new RRD writer with specific backend + /// + /// # Arguments + /// * `base_dir` - Base directory for RRD files + /// * `backend` - RRD backend to use (daemon, direct, or fallback) + pub(crate) async fn with_backend<P: AsRef<Path>>( + base_dir: P, + backend: Box<dyn super::backend::RrdBackend>, + ) -> Result<Self> { + let base_dir = base_dir.as_ref().to_path_buf(); + + // Create base directory if it doesn't exist + fs::create_dir_all(&base_dir) + .with_context(|| format!("Failed to create RRD base directory: {base_dir:?}"))?; + + tracing::info!("RRD writer using backend: {}", backend.name()); + + Ok(Self { + base_dir, + backend, + created_files: HashMap::new(), + }) + } + + /// Create default backend (fallback: daemon + direct) + /// + /// This matches the C implementation's behavior: + /// - Tries rrdcached daemon first for performance + /// - Falls back to direct file writes if daemon fails + async fn default_backend() -> Result<Box<dyn super::backend::RrdBackend>> { + let backend = super::backend::RrdFallbackBackend::new("/var/run/rrdcached.sock").await; + Ok(Box::new(backend)) + } + + /// Update RRD file with metric data + /// + /// This will: + /// 1. Transform data from source format to target format (padding/truncation/column skipping) + /// 2. Create the RRD file if it doesn't exist + /// 3. Update via rrdcached daemon + /// + /// # Arguments + /// * `key` - RRD key (e.g., "pve2-node/node1", "pve-vm-9.0/100") + /// * `data` - Metric data string (format: "timestamp:value1:value2:...") + pub async fn update(&mut self, key: &str, data: &str) -> Result<()> { + // Parse the key to determine file path and schema + let key_type = RrdKeyType::parse(key).with_context(|| format!("Invalid RRD key: {key}"))?; + + // Get source format and target schema + let source_format = key_type.source_format(); + let target_schema = key_type.schema(); + let metric_type = key_type.metric_type(); + + // Transform data from source to target format + let transformed_data = + Self::transform_data(data, source_format, &target_schema, metric_type) + .with_context(|| format!("Failed to transform RRD data for key: {key}"))?; + + // Get the file path (always uses current format) + let file_path = key_type.file_path(&self.base_dir); + + // Ensure the RRD file exists + if !self.created_files.contains_key(key) && !file_path.exists() { + self.create_rrd_file(&key_type, &file_path).await?; + self.created_files.insert(key.to_string(), ()); + } + + // Update the RRD file via backend + self.backend.update(&file_path, &transformed_data).await?; + + Ok(()) + } + + /// Create RRD file with appropriate schema via backend + async fn create_rrd_file(&mut self, key_type: &RrdKeyType, file_path: &Path) -> Result<()> { + // Ensure parent directory exists + if let Some(parent) = file_path.parent() { + fs::create_dir_all(parent) + .with_context(|| format!("Failed to create directory: {parent:?}"))?; + } + + // Get schema for this RRD type + let schema = key_type.schema(); + + // Calculate start time (at day boundary, matching C implementation) + let now = Utc::now(); + let start = now + .date_naive() + .and_hms_opt(0, 0, 0) + .expect("00:00:00 is always a valid time") + .and_utc(); + let start_timestamp = start.timestamp(); + + tracing::debug!( + "Creating RRD file: {:?} with {} data sources via {}", + file_path, + schema.column_count(), + self.backend.name() + ); + + // Delegate to backend for creation + self.backend + .create(file_path, &schema, start_timestamp) + .await?; + + tracing::info!("Created RRD file: {:?} ({})", file_path, schema); + + Ok(()) + } + + /// Transform data from source format to target format + /// + /// This implements the C behavior from status.c: + /// 1. Skip non-archivable columns only for old formats (uptime, status for nodes) + /// 2. Pad old format data with `:U` for missing columns + /// 3. Truncate future format data to known columns + /// + /// # Arguments + /// * `data` - Raw data string from status update (format: "timestamp:v1:v2:...") + /// * `source_format` - Format indicated by the input key + /// * `target_schema` - Target RRD schema (always Pve9_0 currently) + /// * `metric_type` - Type of metric (Node, VM, Storage) for column skipping + /// + /// # Returns + /// Transformed data string ready for RRD update + fn transform_data( + data: &str, + source_format: RrdFormat, + target_schema: &RrdSchema, + metric_type: MetricType, + ) -> Result<String> { + let mut parts = data.split(':'); + + let timestamp = parts + .next() + .ok_or_else(|| anyhow::anyhow!("Empty data string"))?; + + // Skip non-archivable columns for old format only (C: status.c:1300, 1335, 1385) + let skip_count = if source_format == RrdFormat::Pve2 { + metric_type.skip_columns() + } else { + 0 + }; + + // Build transformed data: timestamp + values (skipped, padded/truncated to target_cols) + let target_cols = target_schema.column_count(); + + // Join values with ':' separator, efficiently building the string without Vec allocation + let mut iter = parts + .skip(skip_count) + .chain(std::iter::repeat("U")) + .take(target_cols); + let values = match iter.next() { + Some(first) => { + // Start with first value, fold remaining values with separator + iter.fold(first.to_string(), |mut acc, value| { + acc.push(':'); + acc.push_str(value); + acc + }) + } + None => String::new(), + }; + + Ok(format!("{timestamp}:{values}")) + } + + /// Flush all pending updates + #[allow(dead_code)] // Used via RRD update cycle + pub(crate) async fn flush(&mut self) -> Result<()> { + self.backend.flush().await + } + + /// Get base directory + #[allow(dead_code)] // Used for path resolution in updates + pub(crate) fn base_dir(&self) -> &Path { + &self.base_dir + } +} + +impl Drop for RrdWriter { + fn drop(&mut self) { + // Note: We can't flush in Drop since it's async + // Users should call flush() explicitly before dropping if needed + tracing::debug!("RrdWriter dropped"); + } +} + +#[cfg(test)] +mod tests { + use super::super::schema::{RrdFormat, RrdSchema}; + use super::*; + + #[test] + fn test_rrd_file_path_generation() { + let temp_dir = std::path::PathBuf::from("/tmp/test"); + + let key_node = RrdKeyType::Node { + nodename: "testnode".to_string(), + format: RrdFormat::Pve9_0, + }; + let path = key_node.file_path(&temp_dir); + assert_eq!(path, temp_dir.join("pve-node-9.0").join("testnode")); + } + + // ===== Format Adaptation Tests ===== + + #[test] + fn test_transform_data_node_pve2_to_pve9() { + // Test padding old format (12 cols) to new format (19 cols) + // Input: timestamp:uptime:status:load:maxcpu:cpu:iowait:memtotal:memused:swap_t:swap_u:netin:netout + let data = "1234567890:1000:0:1.5:4:2.0:0.5:8000000000:6000000000:0:0:1000000:500000"; + + let schema = RrdSchema::node(RrdFormat::Pve9_0); + let result = + RrdWriter::transform_data(data, RrdFormat::Pve2, &schema, MetricType::Node).unwrap(); + + // After skipping 2 cols (uptime, status) and padding with 7 U's: + // timestamp:load:maxcpu:cpu:iowait:memtotal:memused:swap_t:swap_u:netin:netout:U:U:U:U:U:U:U + let parts: Vec<&str> = result.split(':').collect(); + assert_eq!(parts[0], "1234567890", "Timestamp should be preserved"); + assert_eq!(parts.len(), 20, "Should have timestamp + 19 values"); // 1 + 19 + assert_eq!(parts[1], "1.5", "First value after skip should be load"); + assert_eq!(parts[2], "4", "Second value should be maxcpu"); + + // Check padding + for (i, item) in parts.iter().enumerate().take(20).skip(12) { + assert_eq!(item, &"U", "Column {} should be padded with U", i); + } + } + + #[test] + fn test_transform_data_vm_pve2_to_pve9() { + // Test VM transformation with 4 columns skipped + // Input: timestamp:uptime:status:template:pid:maxcpu:cpu:maxmem:mem:maxdisk:disk:netin:netout:diskread:diskwrite + let data = "1234567890:1000:1:0:12345:4:2:4096:2048:100000:50000:1000:500:100:50"; + + let schema = RrdSchema::vm(RrdFormat::Pve9_0); + let result = + RrdWriter::transform_data(data, RrdFormat::Pve2, &schema, MetricType::Vm).unwrap(); + + let parts: Vec<&str> = result.split(':').collect(); + assert_eq!(parts[0], "1234567890"); + assert_eq!(parts.len(), 18, "Should have timestamp + 17 values"); + assert_eq!(parts[1], "4", "First value after skip should be maxcpu"); + + // Check padding (last 7 columns) + for (i, item) in parts.iter().enumerate().take(18).skip(11) { + assert_eq!(item, &"U", "Column {} should be padded", i); + } + } + + #[test] + fn test_transform_data_no_padding_needed() { + // Test when source and target have same column count + let data = "1234567890:1.5:4:2.0:0.5:8000000000:6000000000:0:0:0:0:1000000:500000:7000000000:0:0:0:0:0:0"; + + let schema = RrdSchema::node(RrdFormat::Pve9_0); + let result = + RrdWriter::transform_data(data, RrdFormat::Pve9_0, &schema, MetricType::Node).unwrap(); + + // No transformation should occur (same format) + let parts: Vec<&str> = result.split(':').collect(); + assert_eq!(parts.len(), 20); // timestamp + 19 values + assert_eq!(parts[1], "1.5"); + } + + #[test] + fn test_transform_data_future_format_truncation() { + // Test truncation of future format with extra columns + let data = "1234567890:1:2:3:4:5:6:7:8:9:10:11:12:13:14:15:16:17:18:19:20:21:22:23:24:25"; + + let schema = RrdSchema::node(RrdFormat::Pve9_0); + // Simulating future format that has 25 columns + let result = + RrdWriter::transform_data(data, RrdFormat::Pve9_0, &schema, MetricType::Node).unwrap(); + + let parts: Vec<&str> = result.split(':').collect(); + assert_eq!(parts.len(), 20, "Should truncate to timestamp + 19 values"); + assert_eq!(parts[19], "19", "Last value should be column 19"); + } + + #[test] + fn test_transform_data_storage_no_change() { + // Storage format is same for Pve2 and Pve9_0 (2 columns, no skipping) + let data = "1234567890:1000000000000:500000000000"; + + let schema = RrdSchema::storage(RrdFormat::Pve9_0); + let result = + RrdWriter::transform_data(data, RrdFormat::Pve2, &schema, MetricType::Storage).unwrap(); + + assert_eq!(result, data, "Storage data should not be transformed"); + } + + #[test] + fn test_metric_type_methods() { + assert_eq!(MetricType::Node.skip_columns(), 2); + assert_eq!(MetricType::Vm.skip_columns(), 4); + assert_eq!(MetricType::Storage.skip_columns(), 0); + } + + #[test] + fn test_format_column_counts() { + assert_eq!(RrdFormat::Pve2.column_count(&MetricType::Node), 12); + assert_eq!(RrdFormat::Pve9_0.column_count(&MetricType::Node), 19); + assert_eq!(RrdFormat::Pve2.column_count(&MetricType::Vm), 10); + assert_eq!(RrdFormat::Pve9_0.column_count(&MetricType::Vm), 17); + assert_eq!(RrdFormat::Pve2.column_count(&MetricType::Storage), 2); + assert_eq!(RrdFormat::Pve9_0.column_count(&MetricType::Storage), 2); + } +} -- 2.47.3 _______________________________________________ pve-devel mailing list [email protected] https://lists.proxmox.com/cgi-bin/mailman/listinfo/pve-devel
