This is an automated email from the ASF dual-hosted git repository. guanmingchiu pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/mahout.git
commit cfb94fb86ac04f2de9d2003409dce43402304dd0 Author: Ping <[email protected]> AuthorDate: Sat Dec 6 23:41:56 2025 +0800 [QDP] add DataLoader Test (#687) * Batch Throughput Signed-off-by: 400Ping <[email protected]> * fix Signed-off-by: 400Ping <[email protected]> * [chore] add comment Signed-off-by: 400Ping <[email protected]> --------- Signed-off-by: 400Ping <[email protected]> --- qdp/docs/test/README.md | 8 ++ qdp/qdp-core/examples/dataloader_throughput.rs | 148 +++++++++++++++++++++++++ 2 files changed, 156 insertions(+) diff --git a/qdp/docs/test/README.md b/qdp/docs/test/README.md index ac7d16994..1c24ba783 100644 --- a/qdp/docs/test/README.md +++ b/qdp/docs/test/README.md @@ -23,6 +23,14 @@ Unit tests for QDP core library covering input validation, API workflows, and me - Concurrent state vector management - DLPack tensor metadata validation +### `examples/dataloader_throughput.rs` - DataLoader Batch Throughput + +- Simulates a QML training loop that streams batches of 64 vectors +- Producer/consumer model with configurable prefetch to avoid GPU starvation +- Reports vectors-per-second to verify QDP keeps the GPU busy +- Run: `cargo run -p qdp-core --example dataloader_throughput --release` +- Environment overrides: `BATCHES=<usize>` (default 200), `PREFETCH=<usize>` (default 16) + ### `common/mod.rs` - Test Utilities - `create_test_data(size)`: Generates normalized test data diff --git a/qdp/qdp-core/examples/dataloader_throughput.rs b/qdp/qdp-core/examples/dataloader_throughput.rs new file mode 100644 index 000000000..f13d85bac --- /dev/null +++ b/qdp/qdp-core/examples/dataloader_throughput.rs @@ -0,0 +1,148 @@ +// +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// DataLoader-style throughput test +// Simulates a QML training loop that keeps the GPU fed with batches of vectors. +// Run: cargo run -p qdp-core --example dataloader_throughput --release + +use std::env; +use std::sync::mpsc; +use std::thread; +use std::time::{Duration, Instant}; + +use qdp_core::QdpEngine; + +const BATCH_SIZE: usize = 64; +const VECTOR_LEN: usize = 1024; // 2^10 +const NUM_QUBITS: usize = 10; + +fn build_sample(seed: u64) -> Vec<f64> { + // Lightweight deterministic pattern to keep CPU generation cheap + let mask = (VECTOR_LEN - 1) as u64; // power-of-two mask instead of modulo + let scale = 1.0 / VECTOR_LEN as f64; + + let mut out = Vec::with_capacity(VECTOR_LEN); + for i in 0..VECTOR_LEN { + let mixed = (i as u64 + seed) & mask; + out.push(mixed as f64 * scale); + } + out +} + +fn main() { + println!("=== QDP DataLoader Throughput ==="); + + let engine = match QdpEngine::new(0) { + Ok(engine) => engine, + Err(e) => { + eprintln!("CUDA unavailable or initialization failed: {:?}", e); + return; + } + }; + + let total_batches: usize = env::var("BATCHES") + .ok() + .and_then(|v| v.parse().ok()) + .unwrap_or(200); + let prefetch_depth: usize = env::var("PREFETCH") + .ok() + .and_then(|v| v.parse().ok()) + .filter(|v| *v > 0) + .unwrap_or(16); + let report_interval = Duration::from_secs(1); + + println!("Config:"); + println!(" batch size : {}", BATCH_SIZE); + println!(" vector length: {}", VECTOR_LEN); + println!(" num qubits : {}", NUM_QUBITS); + println!(" batches : {}", total_batches); + println!(" prefetch : {}", prefetch_depth); + println!(" env overrides: BATCHES=<usize> PREFETCH=<usize>"); + println!(); + + let (tx, rx) = mpsc::sync_channel(prefetch_depth); + + let producer = thread::spawn(move || { + for batch_idx in 0..total_batches { + let mut batch = Vec::with_capacity(BATCH_SIZE); + let seed_base = (batch_idx * BATCH_SIZE) as u64; + for i in 0..BATCH_SIZE { + batch.push(build_sample(seed_base + i as u64)); + } + if tx.send(batch).is_err() { + break; + } + } + }); + + let mut total_vectors = 0usize; + let mut last_report = Instant::now(); + let start = Instant::now(); + + for (batch_idx, batch) in rx.iter().enumerate() { + // NOTE: The DataLoader produces host-side batches of size BATCH_SIZE, + // but we currently submit each sample to the GPU one-by-one. + // From the GPU's perspective this is effectively "batch size = 1" + // per encode call; batching is only happening on the host side. + for sample in batch { + match engine.encode(&sample, NUM_QUBITS, "amplitude") { + Ok(ptr) => unsafe { + let managed = &mut *ptr; + if let Some(deleter) = managed.deleter.take() { + deleter(ptr); + } + }, + Err(e) => { + eprintln!( + "Encode failed on batch {} (vector {}): {:?}", + batch_idx, + total_vectors, + e + ); + return; + } + } + } + + total_vectors += BATCH_SIZE; + + if last_report.elapsed() >= report_interval { + let elapsed = start.elapsed().as_secs_f64().max(1e-6); + let throughput = total_vectors as f64 / elapsed; + println!( + "Processed {:4} batches / {:6} vectors -> {:8.1} vectors/sec", + batch_idx + 1, + total_vectors, + throughput + ); + last_report = Instant::now(); + } + + if batch_idx + 1 >= total_batches { + break; + } + } + + let _ = producer.join(); + + let duration = start.elapsed(); + let throughput = total_vectors as f64 / duration.as_secs_f64().max(1e-6); + println!(); + println!( + "=== Completed {} batches ({} vectors) in {:.2?} -> {:.1} vectors/sec ===", + total_batches, total_vectors, duration, throughput + ); +}
