--- Begin Message ---
Looks good to me.
On the Ryzen setup with 10G network between PVE and PBS, NVMe storage
and default settings (50G image with 95% random):
default (4 threads, 16 futures): 995 MB/s
8 threads, 16 futures: 1023 MB/s
8 threads, 32 futures: 989 MB/s
This link has a lower latency and higher frequency and IPC of the CPUs
and has SHA_NI acceleration. This seems to make a difference.
On the dual socket Xeon system with a big RAID-10 out of NVMe SSDs and
two 100 Gbps LACP link to similarly configured PBS, using 3 images (one
with a lot of zeroes 10G, the other two random of the same size
2x100G):
default:
3334 MB/s
763 MB/s
764 MB/s
8 threads, 16 futures:
4190 MB/s
819 MB/s
792 MB/s
8 threads, 32 futures:
4910 MB/s
894 MB/s
776 MB/s
8 threads, 64 futures:
5133 MB/s
849 MB/s
902 MB/s
16 threads, 16 futures:
3906 MB/s
818 MB/s
783 MB/s
16 threads, 32 futures:
4879 MB/s
887 MB/s
872 MB/s
16 threads, 64 futures:
5024 MB/s
927 MB/s
936 MB/s
12 threads, 96 futures:
5288 MB/s
903 MB/s
874 MB/s
16 threads, 128 futures:
5340 MB/s
958 MB/s
954 MB/s
24 threads, 128 futures:
5343 MB/s
950 MB/s
950 MB/s
These systems favor higher parallelism, the CPUs are older and lower
clocke, the PVE node has no support for SHA_NI, the storage has
slightly higher latency.
Thank you for taking this on!
Adam
On Mon, 2025-07-14 at 10:34 +0200, Dominik Csapak wrote:
> by using async futures to load chunks and stream::buffer_unordered to
> buffer up to 16 of them, depending on write/load speed, use tokio's
> task
> spawn to make sure the continue to run in the background, since
> buffer_unordered starts them, but does not poll them to completion
> unless we're awaiting.
>
> With this, we don't need to increase the number of threads in the
> runtime to trigger parallel reads and network traffic to us. This way
> it's only limited by CPU if decoding and/or decrypting is the
> bottleneck.
>
> Benchmark results:
>
> 2 scenarios:
> A: Network: over a local bridge, so not real limit
> Datastore: A fast NVME SSD
> Backup: 60GiB
>
> B: Network: over a 10G link (same switch)
> Datastore: Spinner Raid10 with 6 disks, 2 NVME special devices
> Backup: 60GiB
>
> concurrency duration A speed A duration B speed B
> 1 (current) 110s 557MB/s 739s 83MB/s
> 1 (new) 111s 550MB/s 737s 83MB/s
> 4 55s 1128MB/s 246s 249MB/s
> 8 42s 1446MB/s 171s 358MB/s
> 12 37s 1642MB/s 150s 408MB/s
> 16 40s 1514MB/s 140s 436MB/s
> 20 37 1638MB/s 136s 448MB/s
>
> I saw an increase in CPU usage proportional to the speed increase, so
> while in the current version it uses less than a single core total,
> using 16 parallel futures resulted in 3-4 available threads of the
> tokio runtime to be utilized.
>
> The concurrency and the number of threads can be set with the
> environment variables PBS_RESTORE_FETCH_CONCURRENCY and
> PBS_RESTORE_MAX_THREADS respectively, since there is no universal way
> to
> know which settings are best or wanted.
>
> In any case, if the target and/or source storage is too slow, there
> will
> be back/forward pressure, and this change should only matter for
> storage
> systems where IO depth plays a role and that are fast enough.
>
> The way we count the finished chunks also changes a bit, since they
> can come unordered, so we can't rely on the index position to
> calculate
> the percentage.
>
> This patch is loosely based on the patch from Adam Kalisz[0], but
> removes
> the need to increase the blocking threads and uses the (actually
> always
> used) underlying async implementation for reading remote chunks.
>
> 0:
> https://lore.proxmox.com/pve-devel/mailman.719.1751052794.395.pve-de...@lists.proxmox.com/
>
> Signed-off-by: Dominik Csapak <d.csa...@proxmox.com>
> Based-on-patch-by: Adam Kalisz <adam.kal...@notnullmakers.com>
> ---
> src/restore.rs | 80 +++++++++++++++++++++++++++++++++++++++---------
> --
> 1 file changed, 63 insertions(+), 17 deletions(-)
>
> diff --git a/src/restore.rs b/src/restore.rs
> index 5a5a398..6cafd78 100644
> --- a/src/restore.rs
> +++ b/src/restore.rs
> @@ -2,6 +2,7 @@ use std::convert::TryInto;
> use std::sync::{Arc, Mutex};
>
> use anyhow::{bail, format_err, Error};
> +use futures::StreamExt;
> use once_cell::sync::OnceCell;
> use tokio::runtime::Runtime;
>
> @@ -13,7 +14,7 @@ use
> pbs_datastore::cached_chunk_reader::CachedChunkReader;
> use pbs_datastore::data_blob::DataChunkBuilder;
> use pbs_datastore::fixed_index::FixedIndexReader;
> use pbs_datastore::index::IndexFile;
> -use pbs_datastore::read_chunk::ReadChunk;
> +use pbs_datastore::read_chunk::AsyncReadChunk;
> use pbs_datastore::BackupManifest;
> use pbs_key_config::load_and_decrypt_key;
> use pbs_tools::crypt_config::CryptConfig;
> @@ -29,6 +30,12 @@ struct ImageAccessInfo {
> archive_size: u64,
> }
>
> +//the default number of buffered futures that concurrently load
> chunks
> +const MAX_BUFFERED_FUTURES: usize = 16;
> +
> +// the default number of maximum worker threads for tokio
> +const MAX_WORKER_THREADS: usize = 4;
> +
> pub(crate) struct RestoreTask {
> setup: BackupSetup,
> runtime: Arc<Runtime>,
> @@ -66,11 +73,17 @@ impl RestoreTask {
> }
>
> pub fn new(setup: BackupSetup) -> Result<Self, Error> {
> + let worker_threads =
> std::env::var("PBS_RESTORE_MAX_THREADS")
> + .ok()
> + .and_then(|val| val.parse::<usize>().ok())
> + .unwrap_or(MAX_WORKER_THREADS);
> + eprintln!("using up to {worker_threads} threads");
> let runtime = get_runtime_with_builder(|| {
> let mut builder =
> tokio::runtime::Builder::new_multi_thread();
> builder.enable_all();
> + // we don't use much blocking code, so two should be
> enough
> builder.max_blocking_threads(2);
> - builder.worker_threads(4);
> + builder.worker_threads(worker_threads);
> builder.thread_name("proxmox-restore-worker");
> builder
> });
> @@ -165,26 +178,59 @@ impl RestoreTask {
>
> let start_time = std::time::Instant::now();
>
> - for pos in 0..index.index_count() {
> - let digest = index.index_digest(pos).unwrap();
> + let read_queue = (0..index.index_count()).map(|pos| {
> + let digest = *index.index_digest(pos).unwrap();
> let offset = (pos * index.chunk_size) as u64;
> - if digest == &zero_chunk_digest {
> - let res = write_zero_callback(offset,
> index.chunk_size as u64);
> - if res < 0 {
> - bail!("write_zero_callback failed ({})", res);
> + let chunk_reader = chunk_reader.clone();
> + async move {
> + let chunk = if digest == zero_chunk_digest {
> + None
> + } else {
> + let raw_data = tokio::task::spawn(async move {
> + AsyncReadChunk::read_chunk(&chunk_reader,
> &digest).await
> + })
> + .await??;
> + Some(raw_data)
> + };
> +
> + Ok::<_, Error>((chunk, offset))
> + }
> + });
> +
> + let concurrency =
> std::env::var("PBS_RESTORE_FETCH_CONCURRENCY")
> + .ok()
> + .and_then(|val| val.parse::<usize>().ok())
> + .unwrap_or(MAX_BUFFERED_FUTURES);
> + eprintln!("fetching up to {concurrency} chunks in
> parallel");
> +
> + // this buffers futures and pre-fetches some chunks for us
> + let mut stream =
> futures::stream::iter(read_queue).buffer_unordered(concurrency);
> +
> + let mut count = 0;
> + while let Some(res) = stream.next().await {
> + let res = res?;
> + match res {
> + (None, offset) => {
> + let res = write_zero_callback(offset,
> index.chunk_size as u64);
> + if res < 0 {
> + bail!("write_zero_callback failed ({})",
> res);
> + }
> + bytes += index.chunk_size;
> + zeroes += index.chunk_size;
> }
> - bytes += index.chunk_size;
> - zeroes += index.chunk_size;
> - } else {
> - let raw_data = ReadChunk::read_chunk(&chunk_reader,
> digest)?;
> - let res = write_data_callback(offset, &raw_data);
> - if res < 0 {
> - bail!("write_data_callback failed ({})", res);
> + (Some(raw_data), offset) => {
> + let res = write_data_callback(offset,
> &raw_data);
> + if res < 0 {
> + bail!("write_data_callback failed ({})",
> res);
> + }
> + bytes += raw_data.len();
> }
> - bytes += raw_data.len();
> }
> +
> + count += 1;
> +
> if verbose {
> - let next_per = ((pos + 1) * 100) /
> index.index_count();
> + let next_per = (count * 100) / index.index_count();
> if per != next_per {
> eprintln!(
> "progress {}% (read {} bytes, zeroes = {}%
> ({} bytes), duration {} sec)",
--- End Message ---