This is an automated email from the ASF dual-hosted git repository.
guanmingchiu pushed a commit to branch dev-qdp
in repository https://gitbox.apache.org/repos/asf/mahout.git
The following commit(s) were added to refs/heads/dev-qdp by this push:
new 335036a5c [QDP] [Benchmark] Add a End-to-End benchmark to compare with
speed (#697)
335036a5c is described below
commit 335036a5cc9f79efa049ed404b9b7685857192e8
Author: KUAN-HAO HUANG <[email protected]>
AuthorDate: Mon Dec 8 15:51:04 2025 +0800
[QDP] [Benchmark] Add a End-to-End benchmark to compare with speed (#697)
* [QDP] [Benchmark] Add a End-to-End benchmark to compare with speed
* [QDP] improve memory management
* Revert "[QDP] improve memory management"
This reverts commit d909f3d84b2f8fd1da7394ce93311ec69cc33e34.
---
qdp/benchmark/benchmark_e2e_final.py | 313 +++++++++++++++++++++++++++++++++++
qdp/benchmark/requirements.txt | 10 ++
qdp/qdp-python/src/lib.rs | 28 ++++
3 files changed, 351 insertions(+)
diff --git a/qdp/benchmark/benchmark_e2e_final.py
b/qdp/benchmark/benchmark_e2e_final.py
new file mode 100644
index 000000000..fce668a2c
--- /dev/null
+++ b/qdp/benchmark/benchmark_e2e_final.py
@@ -0,0 +1,313 @@
+#!/usr/bin/env python3
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+FINAL END-TO-END BENCHMARK (Disk -> GPU VRAM).
+
+Scope:
+1. Disk IO: Reading Parquet file.
+2. Preprocessing: L2 Normalization (CPU vs GPU).
+3. Encoding: Quantum State Preparation.
+4. Transfer: Moving data to GPU VRAM.
+5. Consumption: 1 dummy Forward Pass to ensure data is usable.
+
+This is the most realistic comparison for a "Cold Start" Training Epoch.
+"""
+
+import time
+import argparse
+import torch
+import torch.nn as nn
+import numpy as np
+import os
+import pyarrow as pa
+import pyarrow.parquet as pq
+from mahout_qdp import QdpEngine
+
+# Competitors
+try:
+ import pennylane as qml
+
+ HAS_PENNYLANE = True
+except ImportError:
+ HAS_PENNYLANE = False
+
+try:
+ from qiskit import QuantumCircuit, transpile
+ from qiskit_aer import AerSimulator
+
+ HAS_QISKIT = True
+except ImportError:
+ HAS_QISKIT = False
+
+# Config
+DATA_FILE = "final_benchmark_data.parquet"
+HIDDEN_DIM = 16
+BATCH_SIZE = 64 # Small batch to stress loop overhead
+
+
+class DummyQNN(nn.Module):
+ def __init__(self, n_qubits):
+ super().__init__()
+ self.fc = nn.Linear(1 << n_qubits, HIDDEN_DIM)
+
+ def forward(self, x):
+ return self.fc(x)
+
+
+def generate_data(n_qubits, n_samples):
+ if os.path.exists(DATA_FILE):
+ os.remove(DATA_FILE)
+
+ MAHOUT_DATA_FILE = DATA_FILE.replace(".parquet", "_mahout.parquet")
+ if os.path.exists(MAHOUT_DATA_FILE):
+ os.remove(MAHOUT_DATA_FILE)
+
+ print(f"Generating {n_samples} samples of {n_qubits} qubits...")
+ dim = 1 << n_qubits
+
+ # Generate for PennyLane/Qiskit (List format)
+ chunk_size = 500
+ schema_list = pa.schema([("feature_vector", pa.list_(pa.float64()))])
+
+ with pq.ParquetWriter(DATA_FILE, schema_list) as writer:
+ for start_idx in range(0, n_samples, chunk_size):
+ current = min(chunk_size, n_samples - start_idx)
+ data = np.random.rand(current, dim).astype(np.float64)
+ feature_vectors = [row.tolist() for row in data]
+ arrays = pa.array(feature_vectors, type=pa.list_(pa.float64()))
+ batch_table = pa.Table.from_arrays([arrays],
names=["feature_vector"])
+ writer.write_table(batch_table)
+
+ # Generate for Mahout (flat Float64 format, one sample per batch)
+ schema_flat = pa.schema([("data", pa.float64())])
+ with pq.ParquetWriter(MAHOUT_DATA_FILE, schema_flat) as writer:
+ for i in range(n_samples):
+ sample_data = np.random.rand(dim).astype(np.float64)
+ array = pa.array(sample_data, type=pa.float64())
+ batch_table = pa.Table.from_arrays([array], names=["data"])
+ writer.write_table(batch_table)
+
+ file_size_mb = os.path.getsize(DATA_FILE) / (1024 * 1024)
+ mahout_size_mb = os.path.getsize(MAHOUT_DATA_FILE) / (1024 * 1024)
+ print(f" Generated {n_samples} samples")
+ print(f" PennyLane/Qiskit format: {file_size_mb:.2f} MB")
+ print(f" Mahout format: {mahout_size_mb:.2f} MB")
+
+
+# -----------------------------------------------------------
+# 1. Qiskit Full Pipeline
+# -----------------------------------------------------------
+def run_qiskit(n_qubits, n_samples):
+ if not HAS_QISKIT:
+ print("\n[Qiskit] Not installed, skipping.")
+ return 0.0
+
+ print("\n[Qiskit] Full Pipeline (Disk -> GPU)...")
+ model = DummyQNN(n_qubits).cuda()
+ backend = AerSimulator(method="statevector")
+
+ torch.cuda.synchronize()
+ start_time = time.perf_counter()
+
+ # IO
+ import pandas as pd
+
+ df = pd.read_parquet(DATA_FILE)
+ raw_data = np.stack(df["feature_vector"].values)
+ io_time = time.perf_counter() - start_time
+ print(f" IO Time: {io_time:.4f} s")
+
+ # Process batches
+ for i in range(0, n_samples, BATCH_SIZE):
+ batch = raw_data[i : i + BATCH_SIZE]
+
+ # Normalize
+ norms = np.linalg.norm(batch, axis=1, keepdims=True)
+ norms[norms == 0] = 1.0
+ batch = batch / norms
+
+ # State preparation
+ batch_states = []
+ for vec_idx, vec in enumerate(batch):
+ qc = QuantumCircuit(n_qubits)
+ qc.initialize(vec, range(n_qubits))
+ qc.save_statevector()
+ t_qc = transpile(qc, backend)
+ result = backend.run(t_qc).result().get_statevector().data
+ batch_states.append(result)
+
+ if (vec_idx + 1) % 10 == 0:
+ print(f" Processed {vec_idx + 1}/{len(batch)} vectors...",
end="\r")
+
+ # Transfer to GPU
+ gpu_tensor = torch.tensor(
+ np.array(batch_states), device="cuda", dtype=torch.complex64
+ )
+ _ = model(gpu_tensor.abs())
+
+ torch.cuda.synchronize()
+ total_time = time.perf_counter() - start_time
+ print(f"\n Total Time: {total_time:.4f} s")
+ return total_time
+
+
+# -----------------------------------------------------------
+# 2. PennyLane Full Pipeline
+# -----------------------------------------------------------
+def run_pennylane(n_qubits, n_samples):
+ if not HAS_PENNYLANE:
+ print("\n[PennyLane] Not installed, skipping.")
+ return 0.0
+
+ print("\n[PennyLane] Full Pipeline (Disk -> GPU)...")
+
+ dev = qml.device("default.qubit", wires=n_qubits)
+
+ @qml.qnode(dev, interface="torch")
+ def circuit(inputs):
+ qml.AmplitudeEmbedding(
+ features=inputs, wires=range(n_qubits), normalize=True,
pad_with=0.0
+ )
+ return qml.state()
+
+ model = DummyQNN(n_qubits).cuda()
+
+ torch.cuda.synchronize()
+ start_time = time.perf_counter()
+
+ # IO
+ import pandas as pd
+
+ df = pd.read_parquet(DATA_FILE)
+ raw_data = np.stack(df["feature_vector"].values)
+ io_time = time.perf_counter() - start_time
+ print(f" IO Time: {io_time:.4f} s")
+
+ # Process batches
+ for i in range(0, n_samples, BATCH_SIZE):
+ batch_cpu = torch.tensor(raw_data[i : i + BATCH_SIZE])
+
+ # Execute QNode
+ try:
+ state_cpu = circuit(batch_cpu)
+ except Exception:
+ state_cpu = torch.stack([circuit(x) for x in batch_cpu])
+
+ # Transfer to GPU
+ state_gpu = state_cpu.to("cuda", dtype=torch.float32)
+ _ = model(state_gpu.abs())
+
+ torch.cuda.synchronize()
+ total_time = time.perf_counter() - start_time
+ print(f" Total Time: {total_time:.4f} s")
+ return total_time
+
+
+# -----------------------------------------------------------
+# 3. Mahout Full Pipeline
+# -----------------------------------------------------------
+def run_mahout(engine, n_qubits, n_samples):
+ print("\n[Mahout] Full Pipeline (Disk -> GPU)...")
+ model = DummyQNN(n_qubits).cuda()
+ MAHOUT_DATA_FILE = DATA_FILE.replace(".parquet", "_mahout.parquet")
+
+ torch.cuda.synchronize()
+ start_time = time.perf_counter()
+
+ # Read Parquet and encode all samples
+ import pyarrow.parquet as pq
+
+ parquet_file = pq.ParquetFile(MAHOUT_DATA_FILE)
+
+ all_states = []
+ for batch in parquet_file.iter_batches():
+ sample_data = batch.column(0).to_numpy()
+ qtensor = engine.encode(sample_data.tolist(), n_qubits, "amplitude")
+ gpu_state = torch.from_dlpack(qtensor)
+ all_states.append(gpu_state)
+
+ # Stack and convert
+ gpu_all_data = torch.stack(all_states).abs().to(torch.float32)
+
+ encode_time = time.perf_counter() - start_time
+ print(f" IO + Encode Time: {encode_time:.4f} s")
+
+ # Forward pass (data already on GPU)
+ for i in range(0, n_samples, BATCH_SIZE):
+ batch = gpu_all_data[i : i + BATCH_SIZE]
+ _ = model(batch)
+
+ torch.cuda.synchronize()
+ total_time = time.perf_counter() - start_time
+ print(f" Total Time: {total_time:.4f} s")
+ return total_time
+
+
+if __name__ == "__main__":
+ parser = argparse.ArgumentParser(
+ description="Final End-to-End Benchmark (Disk -> GPU VRAM)"
+ )
+ parser.add_argument(
+ "--qubits", type=int, default=16, help="Number of qubits (16
recommended)"
+ )
+ parser.add_argument(
+ "--samples", type=int, default=200, help="Number of training samples"
+ )
+ args = parser.parse_args()
+
+ generate_data(args.qubits, args.samples)
+
+ try:
+ engine = QdpEngine(0)
+ except Exception as e:
+ print(f"Mahout Init Error: {e}")
+ exit(1)
+
+ print("\n" + "=" * 70)
+ print(f"E2E BENCHMARK: {args.qubits} Qubits, {args.samples} Samples")
+ print("=" * 70)
+
+ # Run benchmarks
+ t_pl = run_pennylane(args.qubits, args.samples)
+ t_qiskit = run_qiskit(args.qubits, args.samples)
+ t_mahout = run_mahout(engine, args.qubits, args.samples)
+
+ print("\n" + "=" * 70)
+ print("E2E LATENCY (Lower is Better)")
+ print(f"Samples: {args.samples}, Qubits: {args.qubits}")
+ print("=" * 70)
+
+ results = []
+ if t_mahout > 0:
+ results.append(("Mahout", t_mahout))
+ if t_pl > 0:
+ results.append(("PennyLane", t_pl))
+ if t_qiskit > 0:
+ results.append(("Qiskit", t_qiskit))
+
+ results.sort(key=lambda x: x[1])
+
+ for name, time_val in results:
+ print(f"{name:12s} {time_val:10.4f} s")
+
+ print("-" * 70)
+ if t_mahout > 0:
+ if t_pl > 0:
+ print(f"Speedup vs PennyLane: {t_pl / t_mahout:10.2f}x")
+ if t_qiskit > 0:
+ print(f"Speedup vs Qiskit: {t_qiskit / t_mahout:10.2f}x")
diff --git a/qdp/benchmark/requirements.txt b/qdp/benchmark/requirements.txt
new file mode 100644
index 000000000..9c0ab11b2
--- /dev/null
+++ b/qdp/benchmark/requirements.txt
@@ -0,0 +1,10 @@
+numpy>=1.24,<2.0
+pandas>=2.0
+pyarrow>=14.0
+torch>=2.2
+qiskit>=1.0
+qiskit-aer>=0.17.2
+pennylane>=0.35
+scikit-learn>=1.3
+tqdm
+matplotlib
diff --git a/qdp/qdp-python/src/lib.rs b/qdp/qdp-python/src/lib.rs
index 18484cf15..4ff1e4b02 100644
--- a/qdp/qdp-python/src/lib.rs
+++ b/qdp/qdp-python/src/lib.rs
@@ -180,6 +180,34 @@ impl QdpEngine {
consumed: false,
})
}
+
+ /// Load data from Parquet file and encode into quantum state
+ ///
+ /// **ZERO-COPY**: Reads Parquet chunks directly without intermediate Vec
allocation.
+ ///
+ /// Args:
+ /// path: Path to Parquet file
+ /// num_qubits: Number of qubits for encoding
+ /// encoding_method: Encoding strategy ("amplitude", "angle", or
"basis")
+ ///
+ /// Returns:
+ /// QuantumTensor: DLPack-compatible tensor for zero-copy PyTorch
integration
+ ///
+ /// Raises:
+ /// RuntimeError: If encoding fails
+ ///
+ /// Example:
+ /// >>> engine = QdpEngine(device_id=0)
+ /// >>> qtensor = engine.encode_from_parquet("data.parquet",
num_qubits=2, encoding_method="amplitude")
+ /// >>> torch_tensor = torch.from_dlpack(qtensor)
+ fn encode_from_parquet(&self, path: &str, num_qubits: usize,
encoding_method: &str) -> PyResult<QuantumTensor> {
+ let ptr = self.engine.encode_from_parquet(path, num_qubits,
encoding_method)
+ .map_err(|e| PyRuntimeError::new_err(format!("Encoding from
parquet failed: {}", e)))?;
+ Ok(QuantumTensor {
+ ptr,
+ consumed: false,
+ })
+ }
}
/// Mahout QDP Python module