This is an automated email from the ASF dual-hosted git repository.

guanmingchiu pushed a commit to branch dev-qdp
in repository https://gitbox.apache.org/repos/asf/mahout.git


The following commit(s) were added to refs/heads/dev-qdp by this push:
     new 3cb194a78 [QDP] add DataLoader Test  (#687)
3cb194a78 is described below

commit 3cb194a78171819c3866dba32ed53a36f72cb4f0
Author: Ping <[email protected]>
AuthorDate: Sat Dec 6 23:41:56 2025 +0800

    [QDP] add DataLoader Test  (#687)
    
    * Batch Throughput
    
    Signed-off-by: 400Ping <[email protected]>
    
    * fix
    
    Signed-off-by: 400Ping <[email protected]>
    
    * [chore] add comment
    
    Signed-off-by: 400Ping <[email protected]>
    
    ---------
    
    Signed-off-by: 400Ping <[email protected]>
---
 qdp/docs/test/README.md                        |   8 ++
 qdp/qdp-core/examples/dataloader_throughput.rs | 148 +++++++++++++++++++++++++
 2 files changed, 156 insertions(+)

diff --git a/qdp/docs/test/README.md b/qdp/docs/test/README.md
index ac7d16994..1c24ba783 100644
--- a/qdp/docs/test/README.md
+++ b/qdp/docs/test/README.md
@@ -23,6 +23,14 @@ Unit tests for QDP core library covering input validation, 
API workflows, and me
 - Concurrent state vector management
 - DLPack tensor metadata validation
 
+### `examples/dataloader_throughput.rs` - DataLoader Batch Throughput
+
+- Simulates a QML training loop that streams batches of 64 vectors
+- Producer/consumer model with configurable prefetch to avoid GPU starvation
+- Reports vectors-per-second to verify QDP keeps the GPU busy
+- Run: `cargo run -p qdp-core --example dataloader_throughput --release`
+- Environment overrides: `BATCHES=<usize>` (default 200), `PREFETCH=<usize>` 
(default 16)
+
 ### `common/mod.rs` - Test Utilities
 
 - `create_test_data(size)`: Generates normalized test data
diff --git a/qdp/qdp-core/examples/dataloader_throughput.rs 
b/qdp/qdp-core/examples/dataloader_throughput.rs
new file mode 100644
index 000000000..f13d85bac
--- /dev/null
+++ b/qdp/qdp-core/examples/dataloader_throughput.rs
@@ -0,0 +1,148 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// DataLoader-style throughput test
+// Simulates a QML training loop that keeps the GPU fed with batches of 
vectors.
+// Run: cargo run -p qdp-core --example dataloader_throughput --release
+
+use std::env;
+use std::sync::mpsc;
+use std::thread;
+use std::time::{Duration, Instant};
+
+use qdp_core::QdpEngine;
+
+const BATCH_SIZE: usize = 64;
+const VECTOR_LEN: usize = 1024; // 2^10
+const NUM_QUBITS: usize = 10;
+
+fn build_sample(seed: u64) -> Vec<f64> {
+    // Lightweight deterministic pattern to keep CPU generation cheap
+    let mask = (VECTOR_LEN - 1) as u64; // power-of-two mask instead of modulo
+    let scale = 1.0 / VECTOR_LEN as f64;
+
+    let mut out = Vec::with_capacity(VECTOR_LEN);
+    for i in 0..VECTOR_LEN {
+        let mixed = (i as u64 + seed) & mask;
+        out.push(mixed as f64 * scale);
+    }
+    out
+}
+
+fn main() {
+    println!("=== QDP DataLoader Throughput ===");
+
+    let engine = match QdpEngine::new(0) {
+        Ok(engine) => engine,
+        Err(e) => {
+            eprintln!("CUDA unavailable or initialization failed: {:?}", e);
+            return;
+        }
+    };
+
+    let total_batches: usize = env::var("BATCHES")
+        .ok()
+        .and_then(|v| v.parse().ok())
+        .unwrap_or(200);
+    let prefetch_depth: usize = env::var("PREFETCH")
+        .ok()
+        .and_then(|v| v.parse().ok())
+        .filter(|v| *v > 0)
+        .unwrap_or(16);
+    let report_interval = Duration::from_secs(1);
+
+    println!("Config:");
+    println!("  batch size   : {}", BATCH_SIZE);
+    println!("  vector length: {}", VECTOR_LEN);
+    println!("  num qubits   : {}", NUM_QUBITS);
+    println!("  batches      : {}", total_batches);
+    println!("  prefetch     : {}", prefetch_depth);
+    println!("  env overrides: BATCHES=<usize> PREFETCH=<usize>");
+    println!();
+
+    let (tx, rx) = mpsc::sync_channel(prefetch_depth);
+
+    let producer = thread::spawn(move || {
+        for batch_idx in 0..total_batches {
+            let mut batch = Vec::with_capacity(BATCH_SIZE);
+            let seed_base = (batch_idx * BATCH_SIZE) as u64;
+            for i in 0..BATCH_SIZE {
+                batch.push(build_sample(seed_base + i as u64));
+            }
+            if tx.send(batch).is_err() {
+                break;
+            }
+        }
+    });
+
+    let mut total_vectors = 0usize;
+    let mut last_report = Instant::now();
+    let start = Instant::now();
+
+    for (batch_idx, batch) in rx.iter().enumerate() {
+        // NOTE: The DataLoader produces host-side batches of size BATCH_SIZE,
+        // but we currently submit each sample to the GPU one-by-one.
+        // From the GPU's perspective this is effectively "batch size = 1"
+        // per encode call; batching is only happening on the host side.
+        for sample in batch {
+            match engine.encode(&sample, NUM_QUBITS, "amplitude") {
+                Ok(ptr) => unsafe {
+                    let managed = &mut *ptr;
+                    if let Some(deleter) = managed.deleter.take() {
+                        deleter(ptr);
+                    }
+                },
+                Err(e) => {
+                    eprintln!(
+                        "Encode failed on batch {} (vector {}): {:?}",
+                        batch_idx,
+                        total_vectors,
+                        e
+                    );
+                    return;
+                }
+            }
+        }
+
+        total_vectors += BATCH_SIZE;
+
+        if last_report.elapsed() >= report_interval {
+            let elapsed = start.elapsed().as_secs_f64().max(1e-6);
+            let throughput = total_vectors as f64 / elapsed;
+            println!(
+                "Processed {:4} batches / {:6} vectors -> {:8.1} vectors/sec",
+                batch_idx + 1,
+                total_vectors,
+                throughput
+            );
+            last_report = Instant::now();
+        }
+
+        if batch_idx + 1 >= total_batches {
+            break;
+        }
+    }
+
+    let _ = producer.join();
+
+    let duration = start.elapsed();
+    let throughput = total_vectors as f64 / duration.as_secs_f64().max(1e-6);
+    println!();
+    println!(
+        "=== Completed {} batches ({} vectors) in {:.2?} -> {:.1} vectors/sec 
===",
+        total_batches, total_vectors, duration, throughput
+    );
+}

Reply via email to