This is an automated email from the ASF dual-hosted git repository.

guanmingchiu pushed a commit to branch dev-qdp
in repository https://gitbox.apache.org/repos/asf/mahout.git

commit a74a85f9eb14e2f7cebbc9efc9627c7c0e5a091b
Author: Guan-Ming (Wesley) Chiu <[email protected]>
AuthorDate: Thu Dec 11 20:28:35 2025 +0800

    [QDP] Add data loader benchmark
    
    Co-Authored-By: Ping <[email protected]>
---
 docs/benchmarks/dataloader_throughput.md         |  69 ++++++
 qdp/benchmark/benchmark_dataloader_throughput.py | 284 +++++++++++++++++++++++
 2 files changed, 353 insertions(+)

diff --git a/docs/benchmarks/dataloader_throughput.md 
b/docs/benchmarks/dataloader_throughput.md
new file mode 100644
index 000000000..aa6a9901c
--- /dev/null
+++ b/docs/benchmarks/dataloader_throughput.md
@@ -0,0 +1,69 @@
+# DataLoader Throughput Benchmark
+
+This benchmark mirrors the `qdp-core/examples/dataloader_throughput.rs` 
pipeline and compares Mahout (QDP) against PennyLane and Qiskit on the same 
workload. It streams batches from a CPU-side producer, encodes amplitude states 
on GPU, and reports vectors-per-second.
+
+## Workload
+
+- Qubits: 16 (vector length `2^16`)
+- Batches: 200
+- Batch size: 64
+- Prefetch depth: 16 (CPU producer queue)
+
+## Running
+
+```bash
+# QDP-only Rust example
+cargo run -p qdp-core --example dataloader_throughput --release
+
+# Cross-framework comparison (requires deps in qdp/benchmark/requirements.txt)
+python qdp/benchmark/benchmark_dataloader_throughput.py --qubits 16 --batches 
200 --batch-size 64 --prefetch 16
+```
+
+## Example Output
+
+```
+Generating 12800 samples of 16 qubits...
+  Batch size   : 64
+  Vector length: 65536
+  Batches      : 200
+  Prefetch     : 16
+  Generated 12800 samples
+  PennyLane/Qiskit format: 6400.00 MB
+  Mahout format: 6400.00 MB
+
+======================================================================
+DATALOADER THROUGHPUT BENCHMARK: 16 Qubits, 12800 Samples
+======================================================================
+
+[PennyLane] Full Pipeline (DataLoader -> GPU)...
+  Total Time: 26.1952 s (488.6 vectors/sec)
+
+[Qiskit] Full Pipeline (DataLoader -> GPU)...
+  Total Time: 975.8720 s (13.1 vectors/sec)
+
+[Mahout] Full Pipeline (DataLoader -> GPU)...
+  IO + Encode Time: 115.3920 s
+  Total Time: 115.5840 s (110.8 vectors/sec)
+
+======================================================================
+THROUGHPUT (Higher is Better)
+Samples: 12800, Qubits: 16
+======================================================================
+PennyLane        488.6 vectors/sec
+Mahout           110.8 vectors/sec
+Qiskit            13.1 vectors/sec
+----------------------------------------------------------------------
+Speedup vs PennyLane:       0.23x
+Speedup vs Qiskit:          8.44x
+```
+
+## Notes
+
+- Example numbers reuse prior timings scaled to the default 12.8k vectors; 
re-run on target GPUs for fresh measurements.
+- PennyLane/Qiskit sections include CPU-side state preparation time; Mahout 
timing includes IO + encode on GPU.
+- Install competitor dependencies only if you plan to run their legs; the 
script auto-skips missing frameworks.
+- Flags:
+  - `--qubits`: controls vector length (`2^qubits`).
+  - `--batches`: number of host-side batches to stream.
+  - `--batch-size`: vectors per batch; raises total samples (`batches * 
batch-size`).
+  - `--prefetch`: CPU queue depth; higher values help hide slow CPU-side prep 
(e.g., Qiskit state prep) and keep GPU fed.
diff --git a/qdp/benchmark/benchmark_dataloader_throughput.py 
b/qdp/benchmark/benchmark_dataloader_throughput.py
new file mode 100644
index 000000000..bc8a39291
--- /dev/null
+++ b/qdp/benchmark/benchmark_dataloader_throughput.py
@@ -0,0 +1,284 @@
+#!/usr/bin/env python3
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+DataLoader throughput benchmark across Mahout (QDP), PennyLane, and Qiskit.
+
+The workload mirrors the `qdp-core/examples/dataloader_throughput.rs` pipeline:
+- Generate batches of size `BATCH_SIZE` with deterministic vectors.
+- Prefetch on the CPU side to keep the GPU fed.
+- Encode vectors into amplitude states on GPU and run a tiny consumer op.
+
+Run:
+    python qdp/benchmark/benchmark_dataloader_throughput.py --qubits 16 
--batches 200 --batch-size 64
+"""
+
+import argparse
+import queue
+import threading
+import time
+
+import numpy as np
+import torch
+
+from mahout_qdp import QdpEngine
+
+BAR = "=" * 70
+SEP = "-" * 70
+
+try:
+    import pennylane as qml
+
+    HAS_PENNYLANE = True
+except ImportError:
+    HAS_PENNYLANE = False
+
+try:
+    from qiskit import QuantumCircuit, transpile
+    from qiskit_aer import AerSimulator
+
+    HAS_QISKIT = True
+except ImportError:
+    HAS_QISKIT = False
+
+
+def build_sample(seed: int, vector_len: int) -> np.ndarray:
+    mask = np.uint64(vector_len - 1)
+    scale = 1.0 / vector_len
+    idx = np.arange(vector_len, dtype=np.uint64)
+    mixed = (idx + np.uint64(seed)) & mask
+    return mixed.astype(np.float64) * scale
+
+
+def prefetched_batches(
+    total_batches: int, batch_size: int, vector_len: int, prefetch: int
+):
+    q: queue.Queue[np.ndarray | None] = queue.Queue(maxsize=prefetch)
+
+    def producer():
+        for batch_idx in range(total_batches):
+            base = batch_idx * batch_size
+            batch = [build_sample(base + i, vector_len) for i in 
range(batch_size)]
+            q.put(np.stack(batch))
+        q.put(None)
+
+    threading.Thread(target=producer, daemon=True).start()
+
+    while True:
+        batch = q.get()
+        if batch is None:
+            break
+        yield batch
+
+
+def normalize_batch(batch: np.ndarray) -> np.ndarray:
+    norms = np.linalg.norm(batch, axis=1, keepdims=True)
+    norms[norms == 0] = 1.0
+    return batch / norms
+
+
+def run_mahout(num_qubits: int, total_batches: int, batch_size: int, prefetch: 
int):
+    try:
+        engine = QdpEngine(0)
+    except Exception as exc:
+        print(f"[Mahout] Init failed: {exc}")
+        return 0.0, 0.0
+
+    torch.cuda.synchronize()
+    start = time.perf_counter()
+
+    processed = 0
+    for batch in prefetched_batches(
+        total_batches, batch_size, 1 << num_qubits, prefetch
+    ):
+        normalized = normalize_batch(batch)
+        for sample in normalized:
+            qtensor = engine.encode(sample.tolist(), num_qubits, "amplitude")
+            tensor = 
torch.utils.dlpack.from_dlpack(qtensor).abs().to(torch.float32)
+            _ = tensor.sum()
+            processed += 1
+
+    torch.cuda.synchronize()
+    duration = time.perf_counter() - start
+    throughput = processed / duration if duration > 0 else 0.0
+    print(f"  IO + Encode Time: {duration:.4f} s")
+    print(f"  Total Time: {duration:.4f} s ({throughput:.1f} vectors/sec)")
+    return duration, throughput
+
+
+def run_pennylane(num_qubits: int, total_batches: int, batch_size: int, 
prefetch: int):
+    if not HAS_PENNYLANE:
+        print("[PennyLane] Not installed, skipping.")
+        return 0.0, 0.0
+
+    dev = qml.device("default.qubit", wires=num_qubits)
+
+    @qml.qnode(dev, interface="torch")
+    def circuit(inputs):
+        qml.AmplitudeEmbedding(
+            features=inputs, wires=range(num_qubits), normalize=True, 
pad_with=0.0
+        )
+        return qml.state()
+
+    torch.cuda.synchronize()
+    start = time.perf_counter()
+    processed = 0
+
+    for batch in prefetched_batches(
+        total_batches, batch_size, 1 << num_qubits, prefetch
+    ):
+        batch_cpu = torch.tensor(batch, dtype=torch.float64)
+        try:
+            state_cpu = circuit(batch_cpu)
+        except Exception:
+            state_cpu = torch.stack([circuit(x) for x in batch_cpu])
+        state_gpu = state_cpu.to("cuda", dtype=torch.float32)
+        _ = state_gpu.abs().sum()
+        processed += len(batch_cpu)
+
+    torch.cuda.synchronize()
+    duration = time.perf_counter() - start
+    throughput = processed / duration if duration > 0 else 0.0
+    print(f"  Total Time: {duration:.4f} s ({throughput:.1f} vectors/sec)")
+    return duration, throughput
+
+
+def run_qiskit(num_qubits: int, total_batches: int, batch_size: int, prefetch: 
int):
+    if not HAS_QISKIT:
+        print("[Qiskit] Not installed, skipping.")
+        return 0.0, 0.0
+
+    backend = AerSimulator(method="statevector")
+    torch.cuda.synchronize()
+    start = time.perf_counter()
+    processed = 0
+
+    for batch in prefetched_batches(
+        total_batches, batch_size, 1 << num_qubits, prefetch
+    ):
+        normalized = normalize_batch(batch)
+
+        batch_states = []
+        for vec_idx, vec in enumerate(normalized):
+            qc = QuantumCircuit(num_qubits)
+            qc.initialize(vec, range(num_qubits))
+            qc.save_statevector()
+            t_qc = transpile(qc, backend)
+            state = backend.run(t_qc).result().get_statevector().data
+            batch_states.append(state)
+            processed += 1
+
+        gpu_tensor = torch.tensor(
+            np.array(batch_states), device="cuda", dtype=torch.complex64
+        )
+        _ = gpu_tensor.abs().sum()
+
+    torch.cuda.synchronize()
+    duration = time.perf_counter() - start
+    throughput = processed / duration if duration > 0 else 0.0
+    print(f"\n  Total Time: {duration:.4f} s ({throughput:.1f} vectors/sec)")
+    return duration, throughput
+
+
+def main():
+    parser = argparse.ArgumentParser(
+        description="Benchmark DataLoader throughput across frameworks."
+    )
+    parser.add_argument(
+        "--qubits",
+        type=int,
+        default=16,
+        help="Number of qubits (power-of-two vector length).",
+    )
+    parser.add_argument(
+        "--batches", type=int, default=200, help="Total batches to stream."
+    )
+    parser.add_argument("--batch-size", type=int, default=64, help="Vectors 
per batch.")
+    parser.add_argument(
+        "--prefetch", type=int, default=16, help="CPU-side prefetch depth."
+    )
+    args = parser.parse_args()
+
+    total_vectors = args.batches * args.batch_size
+    vector_len = 1 << args.qubits
+
+    print(f"Generating {total_vectors} samples of {args.qubits} qubits...")
+    print(f"  Batch size   : {args.batch_size}")
+    print(f"  Vector length: {vector_len}")
+    print(f"  Batches      : {args.batches}")
+    print(f"  Prefetch     : {args.prefetch}")
+    bytes_per_vec = vector_len * 8
+    print(f"  Generated {total_vectors} samples")
+    print(
+        f"  PennyLane/Qiskit format: {total_vectors * bytes_per_vec / (1024 * 
1024):.2f} MB"
+    )
+    print(f"  Mahout format: {total_vectors * bytes_per_vec / (1024 * 
1024):.2f} MB")
+    print()
+
+    print(BAR)
+    print(
+        f"DATALOADER THROUGHPUT BENCHMARK: {args.qubits} Qubits, 
{total_vectors} Samples"
+    )
+    print(BAR)
+
+    print()
+    print("[PennyLane] Full Pipeline (DataLoader -> GPU)...")
+    t_pl, th_pl = run_pennylane(
+        args.qubits, args.batches, args.batch_size, args.prefetch
+    )
+
+    print()
+    print("[Qiskit] Full Pipeline (DataLoader -> GPU)...")
+    t_qiskit, th_qiskit = run_qiskit(
+        args.qubits, args.batches, args.batch_size, args.prefetch
+    )
+
+    print()
+    print("[Mahout] Full Pipeline (DataLoader -> GPU)...")
+    t_mahout, th_mahout = run_mahout(
+        args.qubits, args.batches, args.batch_size, args.prefetch
+    )
+
+    print()
+    print(BAR)
+    print("THROUGHPUT (Higher is Better)")
+    print(f"Samples: {total_vectors}, Qubits: {args.qubits}")
+    print(BAR)
+
+    throughput_results = []
+    if th_pl > 0:
+        throughput_results.append(("PennyLane", th_pl))
+    if th_qiskit > 0:
+        throughput_results.append(("Qiskit", th_qiskit))
+    if th_mahout > 0:
+        throughput_results.append(("Mahout", th_mahout))
+
+    throughput_results.sort(key=lambda x: x[1], reverse=True)
+
+    for name, tput in throughput_results:
+        print(f"{name:12s} {tput:10.1f} vectors/sec")
+
+    if t_mahout > 0:
+        print(SEP)
+        if t_pl > 0:
+            print(f"Speedup vs PennyLane: {th_mahout / th_pl:10.2f}x")
+        if t_qiskit > 0:
+            print(f"Speedup vs Qiskit:    {th_mahout / th_qiskit:10.2f}x")
+
+
+if __name__ == "__main__":
+    main()

Reply via email to