This is an automated email from the ASF dual-hosted git repository. guanmingchiu pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/mahout.git
commit 230abcd3e8872c517d6af6b595df0c79871f7419 Author: KUAN-HAO HUANG <[email protected]> AuthorDate: Mon Dec 8 15:51:04 2025 +0800 [QDP] [Benchmark] Add a End-to-End benchmark to compare with speed (#697) * [QDP] [Benchmark] Add a End-to-End benchmark to compare with speed * [QDP] improve memory management * Revert "[QDP] improve memory management" This reverts commit d909f3d84b2f8fd1da7394ce93311ec69cc33e34. --- qdp/benchmark/benchmark_e2e_final.py | 313 +++++++++++++++++++++++++++++++++++ qdp/benchmark/requirements.txt | 10 ++ qdp/qdp-python/src/lib.rs | 28 ++++ 3 files changed, 351 insertions(+) diff --git a/qdp/benchmark/benchmark_e2e_final.py b/qdp/benchmark/benchmark_e2e_final.py new file mode 100644 index 000000000..fce668a2c --- /dev/null +++ b/qdp/benchmark/benchmark_e2e_final.py @@ -0,0 +1,313 @@ +#!/usr/bin/env python3 +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +FINAL END-TO-END BENCHMARK (Disk -> GPU VRAM). + +Scope: +1. Disk IO: Reading Parquet file. +2. Preprocessing: L2 Normalization (CPU vs GPU). +3. Encoding: Quantum State Preparation. +4. Transfer: Moving data to GPU VRAM. +5. Consumption: 1 dummy Forward Pass to ensure data is usable. + +This is the most realistic comparison for a "Cold Start" Training Epoch. +""" + +import time +import argparse +import torch +import torch.nn as nn +import numpy as np +import os +import pyarrow as pa +import pyarrow.parquet as pq +from mahout_qdp import QdpEngine + +# Competitors +try: + import pennylane as qml + + HAS_PENNYLANE = True +except ImportError: + HAS_PENNYLANE = False + +try: + from qiskit import QuantumCircuit, transpile + from qiskit_aer import AerSimulator + + HAS_QISKIT = True +except ImportError: + HAS_QISKIT = False + +# Config +DATA_FILE = "final_benchmark_data.parquet" +HIDDEN_DIM = 16 +BATCH_SIZE = 64 # Small batch to stress loop overhead + + +class DummyQNN(nn.Module): + def __init__(self, n_qubits): + super().__init__() + self.fc = nn.Linear(1 << n_qubits, HIDDEN_DIM) + + def forward(self, x): + return self.fc(x) + + +def generate_data(n_qubits, n_samples): + if os.path.exists(DATA_FILE): + os.remove(DATA_FILE) + + MAHOUT_DATA_FILE = DATA_FILE.replace(".parquet", "_mahout.parquet") + if os.path.exists(MAHOUT_DATA_FILE): + os.remove(MAHOUT_DATA_FILE) + + print(f"Generating {n_samples} samples of {n_qubits} qubits...") + dim = 1 << n_qubits + + # Generate for PennyLane/Qiskit (List format) + chunk_size = 500 + schema_list = pa.schema([("feature_vector", pa.list_(pa.float64()))]) + + with pq.ParquetWriter(DATA_FILE, schema_list) as writer: + for start_idx in range(0, n_samples, chunk_size): + current = min(chunk_size, n_samples - start_idx) + data = np.random.rand(current, dim).astype(np.float64) + feature_vectors = [row.tolist() for row in data] + arrays = pa.array(feature_vectors, type=pa.list_(pa.float64())) + batch_table = pa.Table.from_arrays([arrays], names=["feature_vector"]) + writer.write_table(batch_table) + + # Generate for Mahout (flat Float64 format, one sample per batch) + schema_flat = pa.schema([("data", pa.float64())]) + with pq.ParquetWriter(MAHOUT_DATA_FILE, schema_flat) as writer: + for i in range(n_samples): + sample_data = np.random.rand(dim).astype(np.float64) + array = pa.array(sample_data, type=pa.float64()) + batch_table = pa.Table.from_arrays([array], names=["data"]) + writer.write_table(batch_table) + + file_size_mb = os.path.getsize(DATA_FILE) / (1024 * 1024) + mahout_size_mb = os.path.getsize(MAHOUT_DATA_FILE) / (1024 * 1024) + print(f" Generated {n_samples} samples") + print(f" PennyLane/Qiskit format: {file_size_mb:.2f} MB") + print(f" Mahout format: {mahout_size_mb:.2f} MB") + + +# ----------------------------------------------------------- +# 1. Qiskit Full Pipeline +# ----------------------------------------------------------- +def run_qiskit(n_qubits, n_samples): + if not HAS_QISKIT: + print("\n[Qiskit] Not installed, skipping.") + return 0.0 + + print("\n[Qiskit] Full Pipeline (Disk -> GPU)...") + model = DummyQNN(n_qubits).cuda() + backend = AerSimulator(method="statevector") + + torch.cuda.synchronize() + start_time = time.perf_counter() + + # IO + import pandas as pd + + df = pd.read_parquet(DATA_FILE) + raw_data = np.stack(df["feature_vector"].values) + io_time = time.perf_counter() - start_time + print(f" IO Time: {io_time:.4f} s") + + # Process batches + for i in range(0, n_samples, BATCH_SIZE): + batch = raw_data[i : i + BATCH_SIZE] + + # Normalize + norms = np.linalg.norm(batch, axis=1, keepdims=True) + norms[norms == 0] = 1.0 + batch = batch / norms + + # State preparation + batch_states = [] + for vec_idx, vec in enumerate(batch): + qc = QuantumCircuit(n_qubits) + qc.initialize(vec, range(n_qubits)) + qc.save_statevector() + t_qc = transpile(qc, backend) + result = backend.run(t_qc).result().get_statevector().data + batch_states.append(result) + + if (vec_idx + 1) % 10 == 0: + print(f" Processed {vec_idx + 1}/{len(batch)} vectors...", end="\r") + + # Transfer to GPU + gpu_tensor = torch.tensor( + np.array(batch_states), device="cuda", dtype=torch.complex64 + ) + _ = model(gpu_tensor.abs()) + + torch.cuda.synchronize() + total_time = time.perf_counter() - start_time + print(f"\n Total Time: {total_time:.4f} s") + return total_time + + +# ----------------------------------------------------------- +# 2. PennyLane Full Pipeline +# ----------------------------------------------------------- +def run_pennylane(n_qubits, n_samples): + if not HAS_PENNYLANE: + print("\n[PennyLane] Not installed, skipping.") + return 0.0 + + print("\n[PennyLane] Full Pipeline (Disk -> GPU)...") + + dev = qml.device("default.qubit", wires=n_qubits) + + @qml.qnode(dev, interface="torch") + def circuit(inputs): + qml.AmplitudeEmbedding( + features=inputs, wires=range(n_qubits), normalize=True, pad_with=0.0 + ) + return qml.state() + + model = DummyQNN(n_qubits).cuda() + + torch.cuda.synchronize() + start_time = time.perf_counter() + + # IO + import pandas as pd + + df = pd.read_parquet(DATA_FILE) + raw_data = np.stack(df["feature_vector"].values) + io_time = time.perf_counter() - start_time + print(f" IO Time: {io_time:.4f} s") + + # Process batches + for i in range(0, n_samples, BATCH_SIZE): + batch_cpu = torch.tensor(raw_data[i : i + BATCH_SIZE]) + + # Execute QNode + try: + state_cpu = circuit(batch_cpu) + except Exception: + state_cpu = torch.stack([circuit(x) for x in batch_cpu]) + + # Transfer to GPU + state_gpu = state_cpu.to("cuda", dtype=torch.float32) + _ = model(state_gpu.abs()) + + torch.cuda.synchronize() + total_time = time.perf_counter() - start_time + print(f" Total Time: {total_time:.4f} s") + return total_time + + +# ----------------------------------------------------------- +# 3. Mahout Full Pipeline +# ----------------------------------------------------------- +def run_mahout(engine, n_qubits, n_samples): + print("\n[Mahout] Full Pipeline (Disk -> GPU)...") + model = DummyQNN(n_qubits).cuda() + MAHOUT_DATA_FILE = DATA_FILE.replace(".parquet", "_mahout.parquet") + + torch.cuda.synchronize() + start_time = time.perf_counter() + + # Read Parquet and encode all samples + import pyarrow.parquet as pq + + parquet_file = pq.ParquetFile(MAHOUT_DATA_FILE) + + all_states = [] + for batch in parquet_file.iter_batches(): + sample_data = batch.column(0).to_numpy() + qtensor = engine.encode(sample_data.tolist(), n_qubits, "amplitude") + gpu_state = torch.from_dlpack(qtensor) + all_states.append(gpu_state) + + # Stack and convert + gpu_all_data = torch.stack(all_states).abs().to(torch.float32) + + encode_time = time.perf_counter() - start_time + print(f" IO + Encode Time: {encode_time:.4f} s") + + # Forward pass (data already on GPU) + for i in range(0, n_samples, BATCH_SIZE): + batch = gpu_all_data[i : i + BATCH_SIZE] + _ = model(batch) + + torch.cuda.synchronize() + total_time = time.perf_counter() - start_time + print(f" Total Time: {total_time:.4f} s") + return total_time + + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description="Final End-to-End Benchmark (Disk -> GPU VRAM)" + ) + parser.add_argument( + "--qubits", type=int, default=16, help="Number of qubits (16 recommended)" + ) + parser.add_argument( + "--samples", type=int, default=200, help="Number of training samples" + ) + args = parser.parse_args() + + generate_data(args.qubits, args.samples) + + try: + engine = QdpEngine(0) + except Exception as e: + print(f"Mahout Init Error: {e}") + exit(1) + + print("\n" + "=" * 70) + print(f"E2E BENCHMARK: {args.qubits} Qubits, {args.samples} Samples") + print("=" * 70) + + # Run benchmarks + t_pl = run_pennylane(args.qubits, args.samples) + t_qiskit = run_qiskit(args.qubits, args.samples) + t_mahout = run_mahout(engine, args.qubits, args.samples) + + print("\n" + "=" * 70) + print("E2E LATENCY (Lower is Better)") + print(f"Samples: {args.samples}, Qubits: {args.qubits}") + print("=" * 70) + + results = [] + if t_mahout > 0: + results.append(("Mahout", t_mahout)) + if t_pl > 0: + results.append(("PennyLane", t_pl)) + if t_qiskit > 0: + results.append(("Qiskit", t_qiskit)) + + results.sort(key=lambda x: x[1]) + + for name, time_val in results: + print(f"{name:12s} {time_val:10.4f} s") + + print("-" * 70) + if t_mahout > 0: + if t_pl > 0: + print(f"Speedup vs PennyLane: {t_pl / t_mahout:10.2f}x") + if t_qiskit > 0: + print(f"Speedup vs Qiskit: {t_qiskit / t_mahout:10.2f}x") diff --git a/qdp/benchmark/requirements.txt b/qdp/benchmark/requirements.txt new file mode 100644 index 000000000..9c0ab11b2 --- /dev/null +++ b/qdp/benchmark/requirements.txt @@ -0,0 +1,10 @@ +numpy>=1.24,<2.0 +pandas>=2.0 +pyarrow>=14.0 +torch>=2.2 +qiskit>=1.0 +qiskit-aer>=0.17.2 +pennylane>=0.35 +scikit-learn>=1.3 +tqdm +matplotlib diff --git a/qdp/qdp-python/src/lib.rs b/qdp/qdp-python/src/lib.rs index 18484cf15..4ff1e4b02 100644 --- a/qdp/qdp-python/src/lib.rs +++ b/qdp/qdp-python/src/lib.rs @@ -180,6 +180,34 @@ impl QdpEngine { consumed: false, }) } + + /// Load data from Parquet file and encode into quantum state + /// + /// **ZERO-COPY**: Reads Parquet chunks directly without intermediate Vec allocation. + /// + /// Args: + /// path: Path to Parquet file + /// num_qubits: Number of qubits for encoding + /// encoding_method: Encoding strategy ("amplitude", "angle", or "basis") + /// + /// Returns: + /// QuantumTensor: DLPack-compatible tensor for zero-copy PyTorch integration + /// + /// Raises: + /// RuntimeError: If encoding fails + /// + /// Example: + /// >>> engine = QdpEngine(device_id=0) + /// >>> qtensor = engine.encode_from_parquet("data.parquet", num_qubits=2, encoding_method="amplitude") + /// >>> torch_tensor = torch.from_dlpack(qtensor) + fn encode_from_parquet(&self, path: &str, num_qubits: usize, encoding_method: &str) -> PyResult<QuantumTensor> { + let ptr = self.engine.encode_from_parquet(path, num_qubits, encoding_method) + .map_err(|e| PyRuntimeError::new_err(format!("Encoding from parquet failed: {}", e)))?; + Ok(QuantumTensor { + ptr, + consumed: false, + }) + } } /// Mahout QDP Python module
