Assuming that a service will hold the same dynamic resource usage on a
new node as on the previous node, score possible migrations, where:

- the cluster node imbalance is minimal (bruteforce), or

- the shifted root mean square and maximum resource usages of the cpu
  and memory is minimal across the cluster nodes (TOPSIS).

Signed-off-by: Daniel Kral <[email protected]>
---
score_best_balancing_migrations() and select_best_balancing_migration()
are separate because there could be future improvements for the single
select, but might be unnecessary and redundant (especially since we need
to expose it at perlmod and PVE::HA::Usage::{Dynamic,Static} twice).

 proxmox-resource-scheduling/src/scheduler.rs | 283 +++++++++++++++++++
 1 file changed, 283 insertions(+)

diff --git a/proxmox-resource-scheduling/src/scheduler.rs 
b/proxmox-resource-scheduling/src/scheduler.rs
index 58215f03..bd69cb2a 100644
--- a/proxmox-resource-scheduling/src/scheduler.rs
+++ b/proxmox-resource-scheduling/src/scheduler.rs
@@ -2,6 +2,9 @@ use anyhow::Error;
 
 use crate::topsis;
 
+use serde::{Deserialize, Serialize};
+use std::collections::BinaryHeap;
+
 /// Generic service stats.
 #[derive(Clone, Copy)]
 pub struct ServiceStats {
@@ -42,6 +45,18 @@ impl NodeStats {
         self.mem += service_stats.maxmem;
     }
 
+    /// Adds the service stats to the node stats as if the service is running 
on the node.
+    pub fn add_running_service(&mut self, service_stats: &ServiceStats) {
+        self.cpu += service_stats.cpu;
+        self.mem += service_stats.mem;
+    }
+
+    /// Removes the service stats from the node stats as if the service is not 
running on the node.
+    pub fn remove_running_service(&mut self, service_stats: &ServiceStats) {
+        self.cpu -= service_stats.cpu;
+        self.mem -= service_stats.mem;
+    }
+
     /// Returns the current cpu usage as a percentage.
     pub fn cpu_load(&self) -> f64 {
         self.cpu / self.maxcpu as f64
@@ -51,6 +66,45 @@ impl NodeStats {
     pub fn mem_load(&self) -> f64 {
         self.mem as f64 / self.maxmem as f64
     }
+
+    /// Returns a combined node usage as a percentage.
+    pub fn load(&self) -> f64 {
+        (self.cpu_load() + self.mem_load()) / 2.0
+    }
+}
+
+fn calculate_node_loads(nodes: &[NodeStats]) -> Vec<f64> {
+    nodes.iter().map(|stats| stats.load()).collect()
+}
+
+/// Returns the load imbalance among the nodes.
+///
+/// The load balance is measured as the statistical dispersion of the 
individual node loads.
+///
+/// The current implementation uses the dimensionless coefficient of 
variation, which expresses the
+/// standard deviation in relation to the average mean of the node loads. 
Additionally, the
+/// coefficient of variation is not robust, which is
+fn calculate_node_imbalance(nodes: &[NodeStats]) -> f64 {
+    let node_count = nodes.len();
+    let node_loads = calculate_node_loads(nodes);
+
+    let load_sum = node_loads
+        .iter()
+        .fold(0.0, |sum, node_load| sum + node_load);
+
+    // load_sum is guaranteed to be 0.0 for empty nodes
+    if load_sum == 0.0 {
+        0.0
+    } else {
+        let load_mean = load_sum / node_count as f64;
+
+        let squared_diff_sum = node_loads
+            .iter()
+            .fold(0.0, |sum, node_load| sum + (node_load - load_mean).powi(2));
+        let load_sd = (squared_diff_sum / node_count as f64).sqrt();
+
+        load_sd / load_mean
+    }
 }
 
 pub struct NodeUsage {
@@ -79,6 +133,71 @@ criteria_struct! {
     static PVE_HA_TOPSIS_CRITERIA;
 }
 
+/// A possible migration.
+#[derive(Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize)]
+#[serde(rename_all = "kebab-case")]
+pub struct Migration {
+    /// Service identifier.
+    pub sid: String,
+    /// The current node of the service.
+    pub source_node: String,
+    /// The possible migration target node for the service.
+    pub target_node: String,
+}
+
+/// A possible migration with a score.
+#[derive(Serialize, Deserialize)]
+#[serde(rename_all = "kebab-case")]
+pub struct ScoredMigration {
+    /// The possible migration.
+    pub migration: Migration,
+    /// The expected node imbalance after the migration.
+    pub imbalance: f64,
+}
+
+impl Ord for ScoredMigration {
+    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
+        self.imbalance.total_cmp(&other.imbalance).reverse()
+    }
+}
+
+impl PartialOrd for ScoredMigration {
+    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
+        Some(self.cmp(other))
+    }
+}
+
+impl PartialEq for ScoredMigration {
+    fn eq(&self, other: &Self) -> bool {
+        self.cmp(other) == std::cmp::Ordering::Equal
+    }
+}
+
+impl Eq for ScoredMigration {}
+
+/// A possible migration candidate.
+#[derive(Clone)]
+pub struct MigrationCandidate {
+    /// Service identifier of a standalone or leading service.
+    pub sid: String,
+    /// The current node of the service.
+    pub source_node: String,
+    /// The possible migration target node for the service.
+    pub target_node: String,
+    /// The current stats of the service.
+    pub stats: ServiceStats,
+}
+
+impl From<MigrationCandidate> for Migration {
+    fn from(candidate: MigrationCandidate) -> Self {
+        Migration {
+            sid: candidate.sid,
+            source_node: candidate.source_node,
+            target_node: candidate.target_node,
+        }
+    }
+}
+
 impl ClusterUsage {
     /// Instantiate cluster usage from node usages.
     pub fn from_nodes<I>(nodes: I) -> Self
@@ -90,6 +209,170 @@ impl ClusterUsage {
         }
     }
 
+    fn node_stats(&self) -> Vec<NodeStats> {
+        self.nodes.iter().map(|node| node.stats).collect()
+    }
+
+    /// Returns the individual node loads.
+    pub fn node_loads(&self) -> Vec<(String, f64)> {
+        self.nodes
+            .iter()
+            .map(|node| (node.name.to_string(), node.stats.load()))
+            .collect()
+    }
+
+    /// Returns the load imbalance among the nodes.
+    ///
+    /// See [`calculate_node_imbalance`] for more information.
+    pub fn node_imbalance(&self) -> f64 {
+        let node_stats = self.node_stats();
+
+        calculate_node_imbalance(&node_stats)
+    }
+
+    /// Returns the load imbalance among the nodes as if a specific service 
was moved.
+    ///
+    /// See [`calculate_node_imbalance`] for more information.
+    pub fn node_imbalance_with_migration(&self, migration: 
&MigrationCandidate) -> f64 {
+        let mut new_node_stats = Vec::with_capacity(self.nodes.len());
+
+        self.nodes.iter().for_each(|node| {
+            let mut new_stats = node.stats;
+
+            if node.name == migration.source_node {
+                new_stats.remove_running_service(&migration.stats);
+            } else if node.name == migration.target_node {
+                new_stats.add_running_service(&migration.stats);
+            }
+
+            new_node_stats.push(new_stats);
+        });
+
+        calculate_node_imbalance(&new_node_stats)
+    }
+
+    /// Score the service motions by the best node imbalance improvement with 
exhaustive search.
+    pub fn score_best_balancing_migrations<I>(
+        &self,
+        candidates: I,
+        limit: usize,
+    ) -> Result<Vec<ScoredMigration>, Error>
+    where
+        I: IntoIterator<Item = MigrationCandidate>,
+    {
+        let mut scored_migrations = candidates
+            .into_iter()
+            .map(|candidate| {
+                let imbalance = self.node_imbalance_with_migration(&candidate);
+
+                ScoredMigration {
+                    migration: candidate.into(),
+                    imbalance,
+                }
+            })
+            .collect::<BinaryHeap<_>>();
+
+        let mut best_alternatives = Vec::new();
+
+        // BinaryHeap::into_iter_sorted() is still in nightly unfortunately
+        while best_alternatives.len() < limit {
+            match scored_migrations.pop() {
+                Some(alternative) => best_alternatives.push(alternative),
+                None => break,
+            }
+        }
+
+        Ok(best_alternatives)
+    }
+
+    /// Select the service motion with the best node imbalance improvement 
with exhaustive search.
+    pub fn select_best_balancing_migration<I>(
+        &self,
+        candidates: I,
+    ) -> Result<Option<ScoredMigration>, Error>
+    where
+        I: IntoIterator<Item = MigrationCandidate>,
+    {
+        let migrations = self.score_best_balancing_migrations(candidates, 1)?;
+
+        Ok(migrations.into_iter().next())
+    }
+
+    /// Score the service motions by the best node imbalance improvement with 
the TOPSIS method.
+    pub fn score_best_balancing_migrations_topsis(
+        &self,
+        candidates: &[MigrationCandidate],
+        limit: usize,
+    ) -> Result<Vec<ScoredMigration>, Error> {
+        let len = self.nodes.len();
+
+        let matrix = candidates
+            .iter()
+            .map(|migration| {
+                let mut highest_cpu = 0.0;
+                let mut squares_cpu = 0.0;
+                let mut highest_mem = 0.0;
+                let mut squares_mem = 0.0;
+
+                let service = &migration.stats;
+                let source_node = &migration.source_node;
+                let target_node = &migration.target_node;
+
+                for node in self.nodes.iter() {
+                    let mut new_stats = node.stats;
+
+                    if &node.name == source_node {
+                        new_stats.remove_running_service(service);
+                    } else if &node.name == target_node {
+                        new_stats.add_running_service(service);
+                    }
+
+                    let new_cpu_load = new_stats.cpu_load();
+                    highest_cpu = f64::max(highest_cpu, new_cpu_load);
+                    squares_cpu += new_cpu_load.powi(2);
+
+                    let new_mem_load = new_stats.mem_load();
+                    highest_mem = f64::max(highest_mem, new_mem_load);
+                    squares_mem += new_mem_load.powi(2);
+                }
+
+                PveTopsisAlternative {
+                    average_cpu: 1.0 + (squares_cpu / len as f64).sqrt(),
+                    highest_cpu: 1.0 + highest_cpu,
+                    average_memory: 1.0 + (squares_mem / len as f64).sqrt(),
+                    highest_memory: 1.0 + highest_mem,
+                }
+                .into()
+            })
+            .collect::<Vec<_>>();
+
+        let best_alternatives =
+            topsis::rank_alternatives(&topsis::Matrix::new(matrix)?, 
&PVE_HA_TOPSIS_CRITERIA)?;
+
+        Ok(best_alternatives
+            .into_iter()
+            .take(limit)
+            .map(|i| {
+                let imbalance = 
self.node_imbalance_with_migration(&candidates[i]);
+
+                ScoredMigration {
+                    migration: candidates[i].clone().into(),
+                    imbalance,
+                }
+            })
+            .collect())
+    }
+
+    /// Select the service motion with the best node imbalance improvement 
with the TOPSIS search.
+    pub fn select_best_balancing_migration_topsis(
+        &self,
+        candidates: &[MigrationCandidate],
+    ) -> Result<Option<ScoredMigration>, Error> {
+        let migrations = 
self.score_best_balancing_migrations_topsis(candidates, 1)?;
+
+        Ok(migrations.into_iter().next())
+    }
+
     /// Scores candidate `nodes` to start a `service` on. Scoring is done 
according to the static memory
     /// and CPU usages of the nodes as if the service would already be running 
on each.
     ///
-- 
2.47.3




Reply via email to