--- Begin Message ---
Looks good to me.

On the Ryzen setup with 10G network between PVE and PBS, NVMe storage
and default settings (50G image with 95% random):
default (4 threads, 16 futures): 995 MB/s
8 threads, 16 futures: 1023 MB/s
8 threads, 32 futures: 989 MB/s

This link has a lower latency and higher frequency and IPC of the CPUs
and has SHA_NI acceleration. This seems to make a difference.

On the dual socket Xeon system with a big RAID-10 out of NVMe SSDs and
two 100 Gbps LACP link to similarly configured PBS, using 3 images (one
with a lot of zeroes 10G, the other two random of the same size
2x100G):

default:
3334 MB/s
763 MB/s
764 MB/s

8 threads, 16 futures:
4190 MB/s
819 MB/s
792 MB/s

8 threads, 32 futures:
4910 MB/s
894 MB/s
776 MB/s

8 threads, 64 futures:
5133 MB/s
849 MB/s
902 MB/s

16 threads, 16 futures:
3906 MB/s
818 MB/s
783 MB/s

16 threads, 32 futures:
4879 MB/s
887 MB/s
872 MB/s

16 threads, 64 futures:
5024 MB/s
927 MB/s
936 MB/s

12 threads, 96 futures:
5288 MB/s
903 MB/s
874 MB/s

16 threads, 128 futures:
5340 MB/s
958 MB/s
954 MB/s

24 threads, 128 futures:
5343 MB/s
950 MB/s
950 MB/s

These systems favor higher parallelism, the CPUs are older and lower
clocke, the PVE node has no support for SHA_NI, the storage has
slightly higher latency.

Thank you for taking this on!

Adam

On Mon, 2025-07-14 at 10:34 +0200, Dominik Csapak wrote:
> by using async futures to load chunks and stream::buffer_unordered to
> buffer up to 16 of them, depending on write/load speed, use tokio's
> task
> spawn to make sure the continue to run in the background, since
> buffer_unordered starts them, but does not poll them to completion
> unless we're awaiting.
> 
> With this, we don't need to increase the number of threads in the
> runtime to trigger parallel reads and network traffic to us. This way
> it's only limited by CPU if decoding and/or decrypting is the
> bottleneck.
> 
> Benchmark results:
> 
> 2 scenarios:
> A:  Network: over a local bridge, so not real limit
>     Datastore: A fast NVME SSD
>     Backup: 60GiB
> 
> B:  Network: over a 10G link (same switch)
>     Datastore: Spinner Raid10 with 6 disks, 2 NVME special devices
>     Backup: 60GiB
> 
> concurrency   duration A   speed A     duration B   speed B
> 1 (current)   110s         557MB/s     739s         83MB/s
> 1 (new)       111s         550MB/s     737s         83MB/s
> 4             55s          1128MB/s    246s         249MB/s
> 8             42s          1446MB/s    171s         358MB/s
> 12            37s          1642MB/s    150s         408MB/s
> 16            40s          1514MB/s    140s         436MB/s
> 20            37           1638MB/s    136s         448MB/s
> 
> I saw an increase in CPU usage proportional to the speed increase, so
> while in the current version it uses less than a single core total,
> using 16 parallel futures resulted in 3-4 available threads of the
> tokio runtime to be utilized.
> 
> The concurrency and the number of threads can be set with the
> environment variables PBS_RESTORE_FETCH_CONCURRENCY and
> PBS_RESTORE_MAX_THREADS respectively, since there is no universal way
> to
> know which settings are best or wanted.
> 
> In any case, if the target and/or source storage is too slow, there
> will
> be back/forward pressure, and this change should only matter for
> storage
> systems where IO depth plays a role and that are fast enough.
> 
> The way we count the finished chunks also changes a bit, since they
> can come unordered, so we can't rely on the index position to
> calculate
> the percentage.
> 
> This patch is loosely based on the patch from Adam Kalisz[0], but
> removes
> the need to increase the blocking threads and uses the (actually
> always
> used) underlying async implementation for reading remote chunks.
> 
> 0:
> https://lore.proxmox.com/pve-devel/mailman.719.1751052794.395.pve-de...@lists.proxmox.com/
> 
> Signed-off-by: Dominik Csapak <d.csa...@proxmox.com>
> Based-on-patch-by: Adam Kalisz <adam.kal...@notnullmakers.com>
> ---
>  src/restore.rs | 80 +++++++++++++++++++++++++++++++++++++++---------
> --
>  1 file changed, 63 insertions(+), 17 deletions(-)
> 
> diff --git a/src/restore.rs b/src/restore.rs
> index 5a5a398..6cafd78 100644
> --- a/src/restore.rs
> +++ b/src/restore.rs
> @@ -2,6 +2,7 @@ use std::convert::TryInto;
>  use std::sync::{Arc, Mutex};
>  
>  use anyhow::{bail, format_err, Error};
> +use futures::StreamExt;
>  use once_cell::sync::OnceCell;
>  use tokio::runtime::Runtime;
>  
> @@ -13,7 +14,7 @@ use
> pbs_datastore::cached_chunk_reader::CachedChunkReader;
>  use pbs_datastore::data_blob::DataChunkBuilder;
>  use pbs_datastore::fixed_index::FixedIndexReader;
>  use pbs_datastore::index::IndexFile;
> -use pbs_datastore::read_chunk::ReadChunk;
> +use pbs_datastore::read_chunk::AsyncReadChunk;
>  use pbs_datastore::BackupManifest;
>  use pbs_key_config::load_and_decrypt_key;
>  use pbs_tools::crypt_config::CryptConfig;
> @@ -29,6 +30,12 @@ struct ImageAccessInfo {
>      archive_size: u64,
>  }
>  
> +//the default number of buffered futures that concurrently load
> chunks
> +const MAX_BUFFERED_FUTURES: usize = 16;
> +
> +// the default number of maximum worker threads for tokio
> +const MAX_WORKER_THREADS: usize = 4;
> +
>  pub(crate) struct RestoreTask {
>      setup: BackupSetup,
>      runtime: Arc<Runtime>,
> @@ -66,11 +73,17 @@ impl RestoreTask {
>      }
>  
>      pub fn new(setup: BackupSetup) -> Result<Self, Error> {
> +        let worker_threads =
> std::env::var("PBS_RESTORE_MAX_THREADS")
> +            .ok()
> +            .and_then(|val| val.parse::<usize>().ok())
> +            .unwrap_or(MAX_WORKER_THREADS);
> +        eprintln!("using up to {worker_threads} threads");
>          let runtime = get_runtime_with_builder(|| {
>              let mut builder =
> tokio::runtime::Builder::new_multi_thread();
>              builder.enable_all();
> +            // we don't use much blocking code, so two should be
> enough
>              builder.max_blocking_threads(2);
> -            builder.worker_threads(4);
> +            builder.worker_threads(worker_threads);
>              builder.thread_name("proxmox-restore-worker");
>              builder
>          });
> @@ -165,26 +178,59 @@ impl RestoreTask {
>  
>          let start_time = std::time::Instant::now();
>  
> -        for pos in 0..index.index_count() {
> -            let digest = index.index_digest(pos).unwrap();
> +        let read_queue = (0..index.index_count()).map(|pos| {
> +            let digest = *index.index_digest(pos).unwrap();
>              let offset = (pos * index.chunk_size) as u64;
> -            if digest == &zero_chunk_digest {
> -                let res = write_zero_callback(offset,
> index.chunk_size as u64);
> -                if res < 0 {
> -                    bail!("write_zero_callback failed ({})", res);
> +            let chunk_reader = chunk_reader.clone();
> +            async move {
> +                let chunk = if digest == zero_chunk_digest {
> +                    None
> +                } else {
> +                    let raw_data = tokio::task::spawn(async move {
> +                        AsyncReadChunk::read_chunk(&chunk_reader,
> &digest).await
> +                    })
> +                    .await??;
> +                    Some(raw_data)
> +                };
> +
> +                Ok::<_, Error>((chunk, offset))
> +            }
> +        });
> +
> +        let concurrency =
> std::env::var("PBS_RESTORE_FETCH_CONCURRENCY")
> +            .ok()
> +            .and_then(|val| val.parse::<usize>().ok())
> +            .unwrap_or(MAX_BUFFERED_FUTURES);
> +        eprintln!("fetching up to {concurrency} chunks in
> parallel");
> +
> +        // this buffers futures and pre-fetches some chunks for us
> +        let mut stream =
> futures::stream::iter(read_queue).buffer_unordered(concurrency);
> +
> +        let mut count = 0;
> +        while let Some(res) = stream.next().await {
> +            let res = res?;
> +            match res {
> +                (None, offset) => {
> +                    let res = write_zero_callback(offset,
> index.chunk_size as u64);
> +                    if res < 0 {
> +                        bail!("write_zero_callback failed ({})",
> res);
> +                    }
> +                    bytes += index.chunk_size;
> +                    zeroes += index.chunk_size;
>                  }
> -                bytes += index.chunk_size;
> -                zeroes += index.chunk_size;
> -            } else {
> -                let raw_data = ReadChunk::read_chunk(&chunk_reader,
> digest)?;
> -                let res = write_data_callback(offset, &raw_data);
> -                if res < 0 {
> -                    bail!("write_data_callback failed ({})", res);
> +                (Some(raw_data), offset) => {
> +                    let res = write_data_callback(offset,
> &raw_data);
> +                    if res < 0 {
> +                        bail!("write_data_callback failed ({})",
> res);
> +                    }
> +                    bytes += raw_data.len();
>                  }
> -                bytes += raw_data.len();
>              }
> +
> +            count += 1;
> +
>              if verbose {
> -                let next_per = ((pos + 1) * 100) /
> index.index_count();
> +                let next_per = (count * 100) / index.index_count();
>                  if per != next_per {
>                      eprintln!(
>                          "progress {}% (read {} bytes, zeroes = {}%
> ({} bytes), duration {} sec)",

--- End Message ---
_______________________________________________
pve-devel mailing list
pve-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pve-devel

Reply via email to