This is an automated email from the ASF dual-hosted git repository.
guanmingchiu pushed a commit to branch dev-qdp
in repository https://gitbox.apache.org/repos/asf/mahout.git
The following commit(s) were added to refs/heads/dev-qdp by this push:
new 3cb194a78 [QDP] add DataLoader Test (#687)
3cb194a78 is described below
commit 3cb194a78171819c3866dba32ed53a36f72cb4f0
Author: Ping <[email protected]>
AuthorDate: Sat Dec 6 23:41:56 2025 +0800
[QDP] add DataLoader Test (#687)
* Batch Throughput
Signed-off-by: 400Ping <[email protected]>
* fix
Signed-off-by: 400Ping <[email protected]>
* [chore] add comment
Signed-off-by: 400Ping <[email protected]>
---------
Signed-off-by: 400Ping <[email protected]>
---
qdp/docs/test/README.md | 8 ++
qdp/qdp-core/examples/dataloader_throughput.rs | 148 +++++++++++++++++++++++++
2 files changed, 156 insertions(+)
diff --git a/qdp/docs/test/README.md b/qdp/docs/test/README.md
index ac7d16994..1c24ba783 100644
--- a/qdp/docs/test/README.md
+++ b/qdp/docs/test/README.md
@@ -23,6 +23,14 @@ Unit tests for QDP core library covering input validation,
API workflows, and me
- Concurrent state vector management
- DLPack tensor metadata validation
+### `examples/dataloader_throughput.rs` - DataLoader Batch Throughput
+
+- Simulates a QML training loop that streams batches of 64 vectors
+- Producer/consumer model with configurable prefetch to avoid GPU starvation
+- Reports vectors-per-second to verify QDP keeps the GPU busy
+- Run: `cargo run -p qdp-core --example dataloader_throughput --release`
+- Environment overrides: `BATCHES=<usize>` (default 200), `PREFETCH=<usize>`
(default 16)
+
### `common/mod.rs` - Test Utilities
- `create_test_data(size)`: Generates normalized test data
diff --git a/qdp/qdp-core/examples/dataloader_throughput.rs
b/qdp/qdp-core/examples/dataloader_throughput.rs
new file mode 100644
index 000000000..f13d85bac
--- /dev/null
+++ b/qdp/qdp-core/examples/dataloader_throughput.rs
@@ -0,0 +1,148 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// DataLoader-style throughput test
+// Simulates a QML training loop that keeps the GPU fed with batches of
vectors.
+// Run: cargo run -p qdp-core --example dataloader_throughput --release
+
+use std::env;
+use std::sync::mpsc;
+use std::thread;
+use std::time::{Duration, Instant};
+
+use qdp_core::QdpEngine;
+
+const BATCH_SIZE: usize = 64;
+const VECTOR_LEN: usize = 1024; // 2^10
+const NUM_QUBITS: usize = 10;
+
+fn build_sample(seed: u64) -> Vec<f64> {
+ // Lightweight deterministic pattern to keep CPU generation cheap
+ let mask = (VECTOR_LEN - 1) as u64; // power-of-two mask instead of modulo
+ let scale = 1.0 / VECTOR_LEN as f64;
+
+ let mut out = Vec::with_capacity(VECTOR_LEN);
+ for i in 0..VECTOR_LEN {
+ let mixed = (i as u64 + seed) & mask;
+ out.push(mixed as f64 * scale);
+ }
+ out
+}
+
+fn main() {
+ println!("=== QDP DataLoader Throughput ===");
+
+ let engine = match QdpEngine::new(0) {
+ Ok(engine) => engine,
+ Err(e) => {
+ eprintln!("CUDA unavailable or initialization failed: {:?}", e);
+ return;
+ }
+ };
+
+ let total_batches: usize = env::var("BATCHES")
+ .ok()
+ .and_then(|v| v.parse().ok())
+ .unwrap_or(200);
+ let prefetch_depth: usize = env::var("PREFETCH")
+ .ok()
+ .and_then(|v| v.parse().ok())
+ .filter(|v| *v > 0)
+ .unwrap_or(16);
+ let report_interval = Duration::from_secs(1);
+
+ println!("Config:");
+ println!(" batch size : {}", BATCH_SIZE);
+ println!(" vector length: {}", VECTOR_LEN);
+ println!(" num qubits : {}", NUM_QUBITS);
+ println!(" batches : {}", total_batches);
+ println!(" prefetch : {}", prefetch_depth);
+ println!(" env overrides: BATCHES=<usize> PREFETCH=<usize>");
+ println!();
+
+ let (tx, rx) = mpsc::sync_channel(prefetch_depth);
+
+ let producer = thread::spawn(move || {
+ for batch_idx in 0..total_batches {
+ let mut batch = Vec::with_capacity(BATCH_SIZE);
+ let seed_base = (batch_idx * BATCH_SIZE) as u64;
+ for i in 0..BATCH_SIZE {
+ batch.push(build_sample(seed_base + i as u64));
+ }
+ if tx.send(batch).is_err() {
+ break;
+ }
+ }
+ });
+
+ let mut total_vectors = 0usize;
+ let mut last_report = Instant::now();
+ let start = Instant::now();
+
+ for (batch_idx, batch) in rx.iter().enumerate() {
+ // NOTE: The DataLoader produces host-side batches of size BATCH_SIZE,
+ // but we currently submit each sample to the GPU one-by-one.
+ // From the GPU's perspective this is effectively "batch size = 1"
+ // per encode call; batching is only happening on the host side.
+ for sample in batch {
+ match engine.encode(&sample, NUM_QUBITS, "amplitude") {
+ Ok(ptr) => unsafe {
+ let managed = &mut *ptr;
+ if let Some(deleter) = managed.deleter.take() {
+ deleter(ptr);
+ }
+ },
+ Err(e) => {
+ eprintln!(
+ "Encode failed on batch {} (vector {}): {:?}",
+ batch_idx,
+ total_vectors,
+ e
+ );
+ return;
+ }
+ }
+ }
+
+ total_vectors += BATCH_SIZE;
+
+ if last_report.elapsed() >= report_interval {
+ let elapsed = start.elapsed().as_secs_f64().max(1e-6);
+ let throughput = total_vectors as f64 / elapsed;
+ println!(
+ "Processed {:4} batches / {:6} vectors -> {:8.1} vectors/sec",
+ batch_idx + 1,
+ total_vectors,
+ throughput
+ );
+ last_report = Instant::now();
+ }
+
+ if batch_idx + 1 >= total_batches {
+ break;
+ }
+ }
+
+ let _ = producer.join();
+
+ let duration = start.elapsed();
+ let throughput = total_vectors as f64 / duration.as_secs_f64().max(1e-6);
+ println!();
+ println!(
+ "=== Completed {} batches ({} vectors) in {:.2?} -> {:.1} vectors/sec
===",
+ total_batches, total_vectors, duration, throughput
+ );
+}