This is an automated email from the ASF dual-hosted git repository. guanmingchiu pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/mahout.git
commit 3175ef9311b27013c34db14d70e8d6027f254b40 Author: Guan-Ming (Wesley) Chiu <[email protected]> AuthorDate: Tue Dec 9 19:04:59 2025 +0800 [QDP] add batch kernel support (#700) * [QDP] Add batch encoding support * Refactor batch pre-processing --- qdp/benchmark/benchmark_e2e_final.py | 158 +++++++++++++++++------ qdp/qdp-core/src/error.rs | 3 + qdp/qdp-core/src/gpu/encodings/amplitude.rs | 191 +++++++++++----------------- qdp/qdp-core/src/gpu/encodings/mod.rs | 22 ++-- qdp/qdp-core/src/gpu/memory.rs | 42 ++++++ qdp/qdp-core/src/io.rs | 97 +++++++++++++- qdp/qdp-core/src/lib.rs | 53 +++++--- qdp/qdp-core/src/preprocessing.rs | 55 ++++++++ qdp/qdp-kernels/src/amplitude.cu | 121 ++++++++++++++++++ qdp/qdp-kernels/src/lib.rs | 15 +++ qdp/qdp-python/src/lib.rs | 19 +-- 11 files changed, 583 insertions(+), 193 deletions(-) diff --git a/qdp/benchmark/benchmark_e2e_final.py b/qdp/benchmark/benchmark_e2e_final.py index fce668a2c..27efab93d 100644 --- a/qdp/benchmark/benchmark_e2e_final.py +++ b/qdp/benchmark/benchmark_e2e_final.py @@ -34,6 +34,7 @@ import torch import torch.nn as nn import numpy as np import os +import itertools import pyarrow as pa import pyarrow.parquet as pq from mahout_qdp import QdpEngine @@ -73,10 +74,6 @@ def generate_data(n_qubits, n_samples): if os.path.exists(DATA_FILE): os.remove(DATA_FILE) - MAHOUT_DATA_FILE = DATA_FILE.replace(".parquet", "_mahout.parquet") - if os.path.exists(MAHOUT_DATA_FILE): - os.remove(MAHOUT_DATA_FILE) - print(f"Generating {n_samples} samples of {n_qubits} qubits...") dim = 1 << n_qubits @@ -93,20 +90,9 @@ def generate_data(n_qubits, n_samples): batch_table = pa.Table.from_arrays([arrays], names=["feature_vector"]) writer.write_table(batch_table) - # Generate for Mahout (flat Float64 format, one sample per batch) - schema_flat = pa.schema([("data", pa.float64())]) - with pq.ParquetWriter(MAHOUT_DATA_FILE, schema_flat) as writer: - for i in range(n_samples): - sample_data = np.random.rand(dim).astype(np.float64) - array = pa.array(sample_data, type=pa.float64()) - batch_table = pa.Table.from_arrays([array], names=["data"]) - writer.write_table(batch_table) - file_size_mb = os.path.getsize(DATA_FILE) / (1024 * 1024) - mahout_size_mb = os.path.getsize(MAHOUT_DATA_FILE) / (1024 * 1024) print(f" Generated {n_samples} samples") - print(f" PennyLane/Qiskit format: {file_size_mb:.2f} MB") - print(f" Mahout format: {mahout_size_mb:.2f} MB") + print(f" Parquet file size: {file_size_mb:.2f} MB") # ----------------------------------------------------------- @@ -115,7 +101,7 @@ def generate_data(n_qubits, n_samples): def run_qiskit(n_qubits, n_samples): if not HAS_QISKIT: print("\n[Qiskit] Not installed, skipping.") - return 0.0 + return 0.0, None print("\n[Qiskit] Full Pipeline (Disk -> GPU)...") model = DummyQNN(n_qubits).cuda() @@ -132,6 +118,8 @@ def run_qiskit(n_qubits, n_samples): io_time = time.perf_counter() - start_time print(f" IO Time: {io_time:.4f} s") + all_qiskit_states = [] + # Process batches for i in range(0, n_samples, BATCH_SIZE): batch = raw_data[i : i + BATCH_SIZE] @@ -158,12 +146,15 @@ def run_qiskit(n_qubits, n_samples): gpu_tensor = torch.tensor( np.array(batch_states), device="cuda", dtype=torch.complex64 ) + all_qiskit_states.append(gpu_tensor) _ = model(gpu_tensor.abs()) torch.cuda.synchronize() total_time = time.perf_counter() - start_time print(f"\n Total Time: {total_time:.4f} s") - return total_time + + all_qiskit_tensor = torch.cat(all_qiskit_states, dim=0) + return total_time, all_qiskit_tensor # ----------------------------------------------------------- @@ -172,7 +163,7 @@ def run_qiskit(n_qubits, n_samples): def run_pennylane(n_qubits, n_samples): if not HAS_PENNYLANE: print("\n[PennyLane] Not installed, skipping.") - return 0.0 + return 0.0, None print("\n[PennyLane] Full Pipeline (Disk -> GPU)...") @@ -198,6 +189,8 @@ def run_pennylane(n_qubits, n_samples): io_time = time.perf_counter() - start_time print(f" IO Time: {io_time:.4f} s") + all_pl_states = [] + # Process batches for i in range(0, n_samples, BATCH_SIZE): batch_cpu = torch.tensor(raw_data[i : i + BATCH_SIZE]) @@ -208,6 +201,8 @@ def run_pennylane(n_qubits, n_samples): except Exception: state_cpu = torch.stack([circuit(x) for x in batch_cpu]) + all_pl_states.append(state_cpu) + # Transfer to GPU state_gpu = state_cpu.to("cuda", dtype=torch.float32) _ = model(state_gpu.abs()) @@ -215,7 +210,13 @@ def run_pennylane(n_qubits, n_samples): torch.cuda.synchronize() total_time = time.perf_counter() - start_time print(f" Total Time: {total_time:.4f} s") - return total_time + + # Stack all collected states + all_pl_states_tensor = torch.cat( + all_pl_states, dim=0 + ) # Should handle cases where last batch is smaller + + return total_time, all_pl_states_tensor # ----------------------------------------------------------- @@ -224,28 +225,31 @@ def run_pennylane(n_qubits, n_samples): def run_mahout(engine, n_qubits, n_samples): print("\n[Mahout] Full Pipeline (Disk -> GPU)...") model = DummyQNN(n_qubits).cuda() - MAHOUT_DATA_FILE = DATA_FILE.replace(".parquet", "_mahout.parquet") torch.cuda.synchronize() start_time = time.perf_counter() - # Read Parquet and encode all samples - import pyarrow.parquet as pq - - parquet_file = pq.ParquetFile(MAHOUT_DATA_FILE) + # Direct Parquet to GPU pipeline + parquet_encode_start = time.perf_counter() + batched_tensor = engine.encode_from_parquet(DATA_FILE, n_qubits, "amplitude") + parquet_encode_time = time.perf_counter() - parquet_encode_start + print(f" Parquet->GPU (IO+Encode): {parquet_encode_time:.4f} s") - all_states = [] - for batch in parquet_file.iter_batches(): - sample_data = batch.column(0).to_numpy() - qtensor = engine.encode(sample_data.tolist(), n_qubits, "amplitude") - gpu_state = torch.from_dlpack(qtensor) - all_states.append(gpu_state) + # Convert to torch tensor (single DLPack call) + dlpack_start = time.perf_counter() + gpu_batched = torch.from_dlpack(batched_tensor) + dlpack_time = time.perf_counter() - dlpack_start + print(f" DLPack conversion: {dlpack_time:.4f} s") - # Stack and convert - gpu_all_data = torch.stack(all_states).abs().to(torch.float32) + # Reshape to [n_samples, state_len] (still complex) + state_len = 1 << n_qubits + gpu_reshaped = gpu_batched.view(n_samples, state_len) - encode_time = time.perf_counter() - start_time - print(f" IO + Encode Time: {encode_time:.4f} s") + # Convert to float for model (batch already on GPU) + reshape_start = time.perf_counter() + gpu_all_data = gpu_reshaped.abs().to(torch.float32) + reshape_time = time.perf_counter() - reshape_start + print(f" Reshape & convert: {reshape_time:.4f} s") # Forward pass (data already on GPU) for i in range(0, n_samples, BATCH_SIZE): @@ -255,7 +259,46 @@ def run_mahout(engine, n_qubits, n_samples): torch.cuda.synchronize() total_time = time.perf_counter() - start_time print(f" Total Time: {total_time:.4f} s") - return total_time + return total_time, gpu_reshaped + + +def compare_states(name_a, states_a, name_b, states_b): + print("\n" + "=" * 70) + print(f"VERIFICATION ({name_a} vs {name_b})") + print("=" * 70) + + # Ensure both tensors are on GPU for comparison + n_compare = min(len(states_a), len(states_b)) + tensor_a = states_a[:n_compare].cuda() + tensor_b = states_b[:n_compare].cuda() + + # Compare Probabilities (|psi|^2) + diff_probs = (tensor_a.abs() ** 2 - tensor_b.abs() ** 2).abs().max().item() + print(f"Max Probability Difference: {diff_probs:.2e}") + + # Compare Raw Amplitudes + # We compare full complex difference magnitude + diff_amps = (tensor_a - tensor_b).abs().max().item() + print(f"Max Amplitude Difference: {diff_amps:.2e}") + + if diff_probs < 1e-5: + print(">> SUCCESS: Quantum States Match!") + else: + print(">> FAILURE: States do not match.") + + +def verify_correctness(states_dict): + # Filter out None values + valid_states = { + name: states for name, states in states_dict.items() if states is not None + } + + if len(valid_states) < 2: + return + + keys = sorted(list(valid_states.keys())) + for name_a, name_b in itertools.combinations(keys, 2): + compare_states(name_a, valid_states[name_a], name_b, valid_states[name_b]) if __name__ == "__main__": @@ -268,8 +311,26 @@ if __name__ == "__main__": parser.add_argument( "--samples", type=int, default=200, help="Number of training samples" ) + parser.add_argument( + "--frameworks", + nargs="+", + default=["mahout", "pennylane"], + choices=["mahout", "pennylane", "qiskit", "all"], + help="Frameworks to benchmark (default: mahout pennylane). Use 'all' to run all available frameworks.", + ) args = parser.parse_args() + # Expand "all" option + if "all" in args.frameworks: + all_frameworks = [] + if "mahout" in args.frameworks or "all" in args.frameworks: + all_frameworks.append("mahout") + if "pennylane" in args.frameworks or "all" in args.frameworks: + all_frameworks.append("pennylane") + if "qiskit" in args.frameworks or "all" in args.frameworks: + all_frameworks.append("qiskit") + args.frameworks = all_frameworks + generate_data(args.qubits, args.samples) try: @@ -282,10 +343,20 @@ if __name__ == "__main__": print(f"E2E BENCHMARK: {args.qubits} Qubits, {args.samples} Samples") print("=" * 70) + # Initialize results + t_pl, pl_all_states = 0.0, None + t_mahout, mahout_all_states = 0.0, None + t_qiskit, qiskit_all_states = 0.0, None + # Run benchmarks - t_pl = run_pennylane(args.qubits, args.samples) - t_qiskit = run_qiskit(args.qubits, args.samples) - t_mahout = run_mahout(engine, args.qubits, args.samples) + if "pennylane" in args.frameworks: + t_pl, pl_all_states = run_pennylane(args.qubits, args.samples) + + if "qiskit" in args.frameworks: + t_qiskit, qiskit_all_states = run_qiskit(args.qubits, args.samples) + + if "mahout" in args.frameworks: + t_mahout, mahout_all_states = run_mahout(engine, args.qubits, args.samples) print("\n" + "=" * 70) print("E2E LATENCY (Lower is Better)") @@ -311,3 +382,12 @@ if __name__ == "__main__": print(f"Speedup vs PennyLane: {t_pl / t_mahout:10.2f}x") if t_qiskit > 0: print(f"Speedup vs Qiskit: {t_qiskit / t_mahout:10.2f}x") + + # Run Verification after benchmarks + verify_correctness( + { + "Mahout": mahout_all_states, + "PennyLane": pl_all_states, + "Qiskit": qiskit_all_states, + } + ) diff --git a/qdp/qdp-core/src/error.rs b/qdp/qdp-core/src/error.rs index 5cfaabedf..5d7adfbd0 100644 --- a/qdp/qdp-core/src/error.rs +++ b/qdp/qdp-core/src/error.rs @@ -36,6 +36,9 @@ pub enum MahoutError { #[error("I/O error: {0}")] Io(String), + + #[error("Not implemented: {0}")] + NotImplemented(String), } /// Result type alias for Mahout operations diff --git a/qdp/qdp-core/src/gpu/encodings/amplitude.rs b/qdp/qdp-core/src/gpu/encodings/amplitude.rs index 6c4277125..ff1490c48 100644 --- a/qdp/qdp-core/src/gpu/encodings/amplitude.rs +++ b/qdp/qdp-core/src/gpu/encodings/amplitude.rs @@ -17,7 +17,7 @@ // Amplitude encoding: direct state injection with L2 normalization use std::sync::Arc; -use arrow::array::{Array, Float64Array}; + use cudarc::driver::CudaDevice; use crate::error::{MahoutError, Result}; use crate::gpu::memory::GpuStateVector; @@ -29,7 +29,7 @@ use std::ffi::c_void; #[cfg(target_os = "linux")] use cudarc::driver::DevicePtr; #[cfg(target_os = "linux")] -use qdp_kernels::launch_amplitude_encode; +use qdp_kernels::{launch_amplitude_encode, launch_amplitude_encode_batch}; #[cfg(target_os = "linux")] use crate::gpu::memory::{ensure_device_memory_available, map_allocation_error}; @@ -133,140 +133,97 @@ impl QuantumEncoder for AmplitudeEncoder { } } - fn name(&self) -> &'static str { - "amplitude" - } - - fn description(&self) -> &'static str { - "Amplitude encoding with L2 normalization" - } - - /// Override to avoid intermediate Vec allocation. Processes chunks directly to GPU offsets. - fn encode_chunked( + /// Encode multiple samples in a single GPU allocation and kernel launch + #[cfg(target_os = "linux")] + fn encode_batch( &self, device: &Arc<CudaDevice>, - chunks: &[Float64Array], + batch_data: &[f64], + num_samples: usize, + sample_size: usize, num_qubits: usize, ) -> Result<GpuStateVector> { - #[cfg(target_os = "linux")] - { - let total_len: usize = chunks.iter().map(|c| c.len()).sum(); - let state_len = 1 << num_qubits; + crate::profile_scope!("AmplitudeEncoder::encode_batch"); - if total_len == 0 { - return Err(MahoutError::InvalidInput("Input chunks cannot be empty".to_string())); - } - if total_len > state_len { - return Err(MahoutError::InvalidInput( - format!("Total data length {} exceeds state vector size {}", total_len, state_len) - )); - } - if num_qubits == 0 || num_qubits > 30 { - return Err(MahoutError::InvalidInput( - format!("Number of qubits {} must be between 1 and 30", num_qubits) - )); - } + // Validate inputs using shared preprocessor + Preprocessor::validate_batch(batch_data, num_samples, sample_size, num_qubits)?; - let state_vector = { - crate::profile_scope!("GPU::Alloc"); - GpuStateVector::new(device, num_qubits)? - }; - - // Require pre-processed data (no nulls) - for chunk in chunks { - if chunk.null_count() > 0 { - return Err(MahoutError::InvalidInput( - format!("Chunk contains {} null values. Data must be pre-processed before encoding", chunk.null_count()) - )); - } - } + let state_len = 1 << num_qubits; - let norm = { - crate::profile_scope!("CPU::L2Norm"); - let mut norm_sq = 0.0; - for chunk in chunks { - norm_sq += chunk.values().iter().map(|&x| x * x).sum::<f64>(); - } - let norm = norm_sq.sqrt(); - if norm == 0.0 { - return Err(MahoutError::InvalidInput("Input data has zero norm".to_string())); - } - norm + // Calculate L2 norms using shared preprocessor (parallelized) + let norms = Preprocessor::calculate_batch_l2_norms(batch_data, num_samples, sample_size)?; + + // Convert to inverse norms + let inv_norms: Vec<f64> = norms.iter().map(|n| 1.0 / n).collect(); + + // Allocate single large GPU buffer for all states + let batch_state_vector = { + crate::profile_scope!("GPU::AllocBatch"); + GpuStateVector::new_batch(device, num_samples, num_qubits)? + }; + + // Upload input data to GPU + let input_batch_gpu = { + crate::profile_scope!("GPU::H2D_InputBatch"); + device.htod_sync_copy(batch_data) + .map_err(|e| MahoutError::MemoryAllocation( + format!("Failed to upload batch input: {:?}", e) + ))? + }; + + // Upload inverse norms to GPU + let inv_norms_gpu = { + crate::profile_scope!("GPU::H2D_Norms"); + device.htod_sync_copy(&inv_norms) + .map_err(|e| MahoutError::MemoryAllocation( + format!("Failed to upload norms: {:?}", e) + ))? + }; + + // Launch batch kernel + { + crate::profile_scope!("GPU::BatchKernelLaunch"); + let ret = unsafe { + launch_amplitude_encode_batch( + *input_batch_gpu.device_ptr() as *const f64, + batch_state_vector.ptr() as *mut c_void, + *inv_norms_gpu.device_ptr() as *const f64, + num_samples, + sample_size, + state_len, + std::ptr::null_mut(), // default stream + ) }; - let mut current_offset = 0; - for chunk in chunks { - let chunk_len = chunk.len(); - if chunk_len == 0 { - continue; - } - - let state_ptr_offset = unsafe { - state_vector.ptr().cast::<u8>() - .add(current_offset * std::mem::size_of::<qdp_kernels::CuDoubleComplex>()) - .cast::<std::ffi::c_void>() - }; - - let chunk_slice = { - crate::profile_scope!("GPU::ChunkH2DCopy"); - // Zero-copy from Arrow buffer to GPU - device.htod_sync_copy(chunk.values()) - .map_err(|e| MahoutError::MemoryAllocation(format!("Failed to copy chunk: {:?}", e)))? - }; - - { - crate::profile_scope!("GPU::KernelLaunch"); - let ret = unsafe { - launch_amplitude_encode( - *chunk_slice.device_ptr() as *const f64, - state_ptr_offset, - chunk_len, - state_len, - norm, - std::ptr::null_mut(), - ) - }; - if ret != 0 { - return Err(MahoutError::KernelLaunch( - format!("Kernel launch failed: {} ({})", ret, cuda_error_to_string(ret)) - )); - } - } - - current_offset += chunk_len; - } - - if total_len < state_len { - let padding_bytes = (state_len - total_len) * std::mem::size_of::<qdp_kernels::CuDoubleComplex>(); - let tail_ptr = unsafe { state_vector.ptr().add(total_len) as *mut c_void }; - - unsafe { - unsafe extern "C" { - fn cudaMemsetAsync(devPtr: *mut c_void, value: i32, count: usize, stream: *mut c_void) -> i32; - } - let result = cudaMemsetAsync(tail_ptr, 0, padding_bytes, std::ptr::null_mut()); - if result != 0 { - return Err(MahoutError::Cuda( - format!("Failed to zero-fill padding: {} ({})", result, cuda_error_to_string(result)) - )); - } - } + if ret != 0 { + return Err(MahoutError::KernelLaunch( + format!("Batch kernel launch failed: {} ({})", ret, cuda_error_to_string(ret)) + )); } + } + // Synchronize + { + crate::profile_scope!("GPU::Synchronize"); device.synchronize() .map_err(|e| MahoutError::Cuda(format!("Sync failed: {:?}", e)))?; - - Ok(state_vector) } - #[cfg(not(target_os = "linux"))] - { - Err(MahoutError::Cuda("CUDA unavailable (non-Linux)".to_string())) - } + Ok(batch_state_vector) + } + + fn name(&self) -> &'static str { + "amplitude" + } + + fn description(&self) -> &'static str { + "Amplitude encoding with L2 normalization" } } impl AmplitudeEncoder { + + /// Async pipeline encoding for large data (SSS-tier optimization) /// /// Uses the generic dual-stream pipeline infrastructure to overlap diff --git a/qdp/qdp-core/src/gpu/encodings/mod.rs b/qdp/qdp-core/src/gpu/encodings/mod.rs index 539355c2f..7273179d0 100644 --- a/qdp/qdp-core/src/gpu/encodings/mod.rs +++ b/qdp/qdp-core/src/gpu/encodings/mod.rs @@ -17,7 +17,7 @@ // Quantum encoding strategies (Strategy Pattern) use std::sync::Arc; -use arrow::array::Float64Array; + use cudarc::driver::CudaDevice; use crate::error::Result; use crate::gpu::memory::GpuStateVector; @@ -34,18 +34,18 @@ pub trait QuantumEncoder: Send + Sync { num_qubits: usize, ) -> Result<GpuStateVector>; - /// Encode from chunked Arrow arrays - /// - /// Default implementation flattens chunks. (TODO: Encoders can override for true zero-copy.) - fn encode_chunked( + /// Encode multiple samples in a single GPU allocation and kernel launch (Batch Encoding) + fn encode_batch( &self, - device: &Arc<CudaDevice>, - chunks: &[Float64Array], - num_qubits: usize, + _device: &Arc<CudaDevice>, + _batch_data: &[f64], + _num_samples: usize, + _sample_size: usize, + _num_qubits: usize, ) -> Result<GpuStateVector> { - // Default: flatten and use regular encode - let data = crate::io::arrow_to_vec_chunked(chunks); - self.encode(device, &data, num_qubits) + Err(crate::error::MahoutError::NotImplemented( + format!("Batch encoding not implemented for {}", self.name()) + )) } /// Validate input data before encoding diff --git a/qdp/qdp-core/src/gpu/memory.rs b/qdp/qdp-core/src/gpu/memory.rs index 1ac8eabb5..a333e103d 100644 --- a/qdp/qdp-core/src/gpu/memory.rs +++ b/qdp/qdp-core/src/gpu/memory.rs @@ -220,4 +220,46 @@ impl GpuStateVector { pub fn size_elements(&self) -> usize { self.size_elements } + + /// Create GPU state vector for a batch of samples + /// Allocates num_samples * 2^qubits complex numbers on GPU + pub fn new_batch(_device: &Arc<CudaDevice>, num_samples: usize, qubits: usize) -> Result<Self> { + let single_state_size: usize = 1usize << qubits; + let total_elements = num_samples.checked_mul(single_state_size) + .ok_or_else(|| MahoutError::MemoryAllocation( + format!("Batch size overflow: {} samples * {} elements", num_samples, single_state_size) + ))?; + + #[cfg(target_os = "linux")] + { + let requested_bytes = total_elements + .checked_mul(std::mem::size_of::<CuDoubleComplex>()) + .ok_or_else(|| MahoutError::MemoryAllocation( + format!("Requested GPU allocation size overflow (elements={})", total_elements) + ))?; + + // Pre-flight check + ensure_device_memory_available(requested_bytes, "batch state vector allocation", Some(qubits))?; + + let slice = unsafe { + _device.alloc::<CuDoubleComplex>(total_elements) + }.map_err(|e| map_allocation_error( + requested_bytes, + "batch state vector allocation", + Some(qubits), + e, + ))?; + + Ok(Self { + buffer: Arc::new(GpuBufferRaw { slice }), + num_qubits: qubits, + size_elements: total_elements, + }) + } + + #[cfg(not(target_os = "linux"))] + { + Err(MahoutError::Cuda("CUDA is only available on Linux. This build does not support GPU operations.".to_string())) + } + } } diff --git a/qdp/qdp-core/src/io.rs b/qdp/qdp-core/src/io.rs index fc9f09cd4..93ad1d018 100644 --- a/qdp/qdp-core/src/io.rs +++ b/qdp/qdp-core/src/io.rs @@ -22,7 +22,7 @@ use std::fs::File; use std::path::Path; use std::sync::Arc; -use arrow::array::{Array, ArrayRef, Float64Array, RecordBatch}; +use arrow::array::{Array, ArrayRef, Float64Array, ListArray, RecordBatch}; use arrow::datatypes::{DataType, Field, Schema}; use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; use parquet::arrow::ArrowWriter; @@ -270,3 +270,98 @@ pub fn write_arrow_to_parquet<P: AsRef<Path>>( Ok(()) } + +/// Read batch data from Parquet file with list column format +/// +/// Efficiently reads Parquet files where each row contains a list of values. +/// Returns a flattened Vec with all samples concatenated, suitable for batch encoding. +/// +/// # Arguments +/// * `path` - Path to Parquet file +/// +/// # Returns +/// Tuple of (flattened_data, num_samples, sample_size) +/// +/// # Example +/// File format: column "feature_vector" with type List<Float64> +/// Each row = one sample = one list of floats +pub fn read_parquet_batch<P: AsRef<Path>>(path: P) -> Result<(Vec<f64>, usize, usize)> { + let file = File::open(path.as_ref()).map_err(|e| { + MahoutError::Io(format!("Failed to open Parquet file: {}", e)) + })?; + + let builder = ParquetRecordBatchReaderBuilder::try_new(file).map_err(|e| { + MahoutError::Io(format!("Failed to create Parquet reader: {}", e)) + })?; + + let mut reader = builder.build().map_err(|e| { + MahoutError::Io(format!("Failed to build Parquet reader: {}", e)) + })?; + + let mut all_data = Vec::new(); + let mut num_samples = 0; + let mut sample_size = None; + + while let Some(batch_result) = reader.next() { + let batch = batch_result.map_err(|e| { + MahoutError::Io(format!("Failed to read Parquet batch: {}", e)) + })?; + + if batch.num_columns() == 0 { + return Err(MahoutError::Io("Parquet file has no columns".to_string())); + } + + let column = batch.column(0); + + // Handle List<Float64> column type + if let DataType::List(_) = column.data_type() { + let list_array = column + .as_any() + .downcast_ref::<ListArray>() + .ok_or_else(|| MahoutError::Io("Failed to downcast to ListArray".to_string()))?; + + for i in 0..list_array.len() { + let value_array = list_array.value(i); + let float_array = value_array + .as_any() + .downcast_ref::<Float64Array>() + .ok_or_else(|| MahoutError::Io("List values must be Float64".to_string()))?; + + let current_size = float_array.len(); + + // Verify all samples have the same size + if let Some(expected_size) = sample_size { + if current_size != expected_size { + return Err(MahoutError::InvalidInput(format!( + "Inconsistent sample sizes: expected {}, got {}", + expected_size, current_size + ))); + } + } else { + sample_size = Some(current_size); + all_data.reserve(current_size * 100); // Reserve space + } + + // Efficiently copy the values + if float_array.null_count() == 0 { + all_data.extend_from_slice(float_array.values()); + } else { + all_data.extend(float_array.iter().map(|opt| opt.unwrap_or(0.0))); + } + + num_samples += 1; + } + } else { + return Err(MahoutError::Io(format!( + "Expected List<Float64> column, got {:?}", + column.data_type() + ))); + } + } + + let sample_size = sample_size.ok_or_else(|| { + MahoutError::Io("Parquet file contains no data".to_string()) + })?; + + Ok((all_data, num_samples, sample_size)) +} diff --git a/qdp/qdp-core/src/lib.rs b/qdp/qdp-core/src/lib.rs index 406715edd..65c08bd02 100644 --- a/qdp/qdp-core/src/lib.rs +++ b/qdp/qdp-core/src/lib.rs @@ -26,7 +26,7 @@ mod profiling; pub use error::{MahoutError, Result}; use std::sync::Arc; -use arrow::array::Float64Array; + use cudarc::driver::CudaDevice; use crate::dlpack::DLManagedTensor; use crate::gpu::get_encoder; @@ -88,40 +88,55 @@ impl QdpEngine { &self.device } - /// Encode from chunked Arrow arrays (zero-copy from Parquet) + /// Encode multiple samples in a single fused kernel (most efficient) + /// + /// Allocates one large GPU buffer and launches a single batch kernel. + /// This is faster than encode_batch() as it reduces allocation and kernel launch overhead. /// /// # Arguments - /// * `chunks` - Chunked Arrow Float64Arrays (from read_parquet_to_arrow) + /// * `batch_data` - Flattened batch data (all samples concatenated) + /// * `num_samples` - Number of samples in the batch + /// * `sample_size` - Size of each sample /// * `num_qubits` - Number of qubits - /// * `encoding_method` - Strategy: "amplitude", "angle", or "basis" + /// * `encoding_method` - Strategy (currently only "amplitude" supported for batch) /// /// # Returns - /// DLPack pointer for zero-copy PyTorch integration - pub fn encode_chunked( + /// Single DLPack pointer containing all encoded states (shape: [num_samples, 2^num_qubits]) + pub fn encode_batch( &self, - chunks: &[Float64Array], + batch_data: &[f64], + num_samples: usize, + sample_size: usize, num_qubits: usize, encoding_method: &str, ) -> Result<*mut DLManagedTensor> { - crate::profile_scope!("Mahout::EncodeChunked"); + crate::profile_scope!("Mahout::EncodeBatch"); let encoder = get_encoder(encoding_method)?; - let state_vector = encoder.encode_chunked(&self.device, chunks, num_qubits)?; - let dlpack_ptr = { - crate::profile_scope!("Mahout::CreateDLPack"); - state_vector.to_dlpack() - }; + let state_vector = encoder.encode_batch( + &self.device, + batch_data, + num_samples, + sample_size, + num_qubits, + )?; + + let dlpack_ptr = state_vector.to_dlpack(); Ok(dlpack_ptr) } /// Load data from Parquet file and encode into quantum state /// - /// **ZERO-COPY**: Reads Parquet chunks directly without intermediate Vec allocation. + /// Reads Parquet file with List<Float64> column format and encodes all samples + /// in a single batch operation. Bypasses pandas for maximum performance. /// /// # Arguments /// * `path` - Path to Parquet file /// * `num_qubits` - Number of qubits /// * `encoding_method` - Strategy: "amplitude", "angle", or "basis" + /// + /// # Returns + /// Single DLPack pointer containing all encoded states (shape: [num_samples, 2^num_qubits]) pub fn encode_from_parquet( &self, path: &str, @@ -130,8 +145,14 @@ impl QdpEngine { ) -> Result<*mut DLManagedTensor> { crate::profile_scope!("Mahout::EncodeFromParquet"); - let chunks = crate::io::read_parquet_to_arrow(path)?; - self.encode_chunked(&chunks, num_qubits, encoding_method) + // Read Parquet directly using Arrow (faster than pandas) + let (batch_data, num_samples, sample_size) = { + crate::profile_scope!("IO::ReadParquetBatch"); + crate::io::read_parquet_batch(path)? + }; + + // Encode using fused batch kernel + self.encode_batch(&batch_data, num_samples, sample_size, num_qubits, encoding_method) } } diff --git a/qdp/qdp-core/src/preprocessing.rs b/qdp/qdp-core/src/preprocessing.rs index 233548a0f..0d8e70148 100644 --- a/qdp/qdp-core/src/preprocessing.rs +++ b/qdp/qdp-core/src/preprocessing.rs @@ -76,4 +76,59 @@ impl Preprocessor { Ok(norm) } + + /// Validates input constraints for batch processing. + pub fn validate_batch( + batch_data: &[f64], + num_samples: usize, + sample_size: usize, + num_qubits: usize, + ) -> Result<()> { + if batch_data.len() != num_samples * sample_size { + return Err(MahoutError::InvalidInput( + format!("Batch data length {} doesn't match num_samples {} * sample_size {}", + batch_data.len(), num_samples, sample_size) + )); + } + + if num_qubits == 0 || num_qubits > 30 { + return Err(MahoutError::InvalidInput( + format!("Number of qubits {} must be between 1 and 30", num_qubits) + )); + } + + let state_len = 1 << num_qubits; + if sample_size > state_len { + return Err(MahoutError::InvalidInput( + format!("Sample size {} exceeds state vector size {}", sample_size, state_len) + )); + } + + Ok(()) + } + + /// Calculates L2 norms for a batch of samples in parallel. + pub fn calculate_batch_l2_norms( + batch_data: &[f64], + _num_samples: usize, + sample_size: usize, + ) -> Result<Vec<f64>> { + crate::profile_scope!("CPU::BatchL2Norm"); + + // Process chunks in parallel using rayon + batch_data + .par_chunks(sample_size) + .enumerate() + .map(|(i, sample)| { + let norm_sq: f64 = sample.iter().map(|&x| x * x).sum(); + let norm = norm_sq.sqrt(); + if norm == 0.0 { + return Err(MahoutError::InvalidInput( + format!("Sample {} has zero norm", i) + )); + } + Ok(norm) + }) + .collect() + } } diff --git a/qdp/qdp-kernels/src/amplitude.cu b/qdp/qdp-kernels/src/amplitude.cu index b1cd06fea..4dd80fb0b 100644 --- a/qdp/qdp-kernels/src/amplitude.cu +++ b/qdp/qdp-kernels/src/amplitude.cu @@ -110,6 +110,127 @@ int launch_amplitude_encode( return (int)cudaGetLastError(); } +/// Optimized batch amplitude encoding kernel +/// +/// Memory Layout (row-major): +/// - input_batch: [sample0_data | sample1_data | ... | sampleN_data] +/// - state_batch: [sample0_state | sample1_state | ... | sampleN_state] +/// +/// Optimizations: +/// 1. Vectorized double2 loads for 128-bit memory transactions +/// 2. Grid-stride loop for arbitrary batch sizes +/// 3. Coalesced memory access within warps +/// 4. Minimized register pressure +__global__ void amplitude_encode_batch_kernel( + const double* __restrict__ input_batch, + cuDoubleComplex* __restrict__ state_batch, + const double* __restrict__ inv_norms, + size_t num_samples, + size_t input_len, + size_t state_len +) { + // Grid-stride loop pattern for flexibility + const size_t elements_per_sample = state_len / 2; // Each thread handles 2 elements + const size_t total_work = num_samples * elements_per_sample; + const size_t stride = gridDim.x * blockDim.x; + + size_t global_idx = blockIdx.x * blockDim.x + threadIdx.x; + + // Process elements in grid-stride fashion + for (size_t idx = global_idx; idx < total_work; idx += stride) { + // Decompose linear index into (sample, element_pair) + const size_t sample_idx = idx / elements_per_sample; + const size_t elem_pair = idx % elements_per_sample; + + // Calculate base addresses (strength-reduced) + const size_t input_base = sample_idx * input_len; + const size_t state_base = sample_idx * state_len; + const size_t elem_offset = elem_pair * 2; + + // Load inverse norm (cached by L1) + const double inv_norm = inv_norms[sample_idx]; + + // Vectorized load: read 2 doubles as double2 for 128-bit transaction + double v1, v2; + if (elem_offset + 1 < input_len) { + // Aligned vectorized load + const double2 vec_data = __ldg(reinterpret_cast<const double2*>(input_batch + input_base) + elem_pair); + v1 = vec_data.x; + v2 = vec_data.y; + } else if (elem_offset < input_len) { + // Edge case: single element load + v1 = __ldg(input_batch + input_base + elem_offset); + v2 = 0.0; + } else { + // Padding region + v1 = v2 = 0.0; + } + + // Normalize and write as complex numbers + // Compiler will optimize multiplications + const cuDoubleComplex c1 = make_cuDoubleComplex(v1 * inv_norm, 0.0); + const cuDoubleComplex c2 = make_cuDoubleComplex(v2 * inv_norm, 0.0); + + // Write to global memory (coalesced within warp) + state_batch[state_base + elem_offset] = c1; + if (elem_offset + 1 < state_len) { + state_batch[state_base + elem_offset + 1] = c2; + } + } +} + +/// Launch optimized batch amplitude encoding kernel +/// +/// # Arguments +/// * input_batch_d - Device pointer to batch input data +/// * state_batch_d - Device pointer to output batch state vectors +/// * inv_norms_d - Device pointer to inverse norms array +/// * num_samples - Number of samples in batch +/// * input_len - Elements per sample +/// * state_len - State vector size per sample (2^num_qubits) +/// * stream - CUDA stream for async execution +/// +/// # Returns +/// CUDA error code (0 = cudaSuccess) +int launch_amplitude_encode_batch( + const double* input_batch_d, + void* state_batch_d, + const double* inv_norms_d, + size_t num_samples, + size_t input_len, + size_t state_len, + cudaStream_t stream +) { + if (num_samples == 0 || state_len == 0) { + return cudaErrorInvalidValue; + } + + cuDoubleComplex* state_complex_d = static_cast<cuDoubleComplex*>(state_batch_d); + + // Optimal configuration for modern GPUs (SM 7.0+) + // - Block size: 256 threads (8 warps, good occupancy) + // - Grid size: Enough blocks to saturate GPU, but not excessive + const int blockSize = 256; + const size_t total_work = num_samples * (state_len / 2); + + // Calculate grid size: aim for high occupancy without too many blocks + // Limit to reasonable number of blocks to avoid scheduler overhead + const size_t blocks_needed = (total_work + blockSize - 1) / blockSize; + const size_t max_blocks = 2048; // Reasonable limit for most GPUs + const size_t gridSize = (blocks_needed < max_blocks) ? blocks_needed : max_blocks; + + amplitude_encode_batch_kernel<<<gridSize, blockSize, 0, stream>>>( + input_batch_d, + state_complex_d, + inv_norms_d, + num_samples, + input_len, + state_len + ); + + return (int)cudaGetLastError(); +} + // TODO: Future encoding methods: // - launch_angle_encode (angle encoding) // - launch_basis_encode (basis encoding) diff --git a/qdp/qdp-kernels/src/lib.rs b/qdp/qdp-kernels/src/lib.rs index a59733fb8..c1bb07544 100644 --- a/qdp/qdp-kernels/src/lib.rs +++ b/qdp/qdp-kernels/src/lib.rs @@ -53,6 +53,21 @@ unsafe extern "C" { stream: *mut c_void, ) -> i32; + /// Launch batch amplitude encoding kernel + /// Returns CUDA error code (0 = success) + /// + /// # Safety + /// Requires valid GPU pointers, must sync before freeing + pub fn launch_amplitude_encode_batch( + input_batch_d: *const f64, + state_batch_d: *mut c_void, + inv_norms_d: *const f64, + num_samples: usize, + input_len: usize, + state_len: usize, + stream: *mut c_void, + ) -> i32; + // TODO: launch_angle_encode, launch_basis_encode } diff --git a/qdp/qdp-python/src/lib.rs b/qdp/qdp-python/src/lib.rs index 4ff1e4b02..cd3fcc3ac 100644 --- a/qdp/qdp-python/src/lib.rs +++ b/qdp/qdp-python/src/lib.rs @@ -181,25 +181,26 @@ impl QdpEngine { }) } - /// Load data from Parquet file and encode into quantum state + /// Encode from Parquet file (FASTEST - recommended for batches) /// - /// **ZERO-COPY**: Reads Parquet chunks directly without intermediate Vec allocation. + /// Direct Parquet→GPU pipeline: + /// - Reads List<Float64> column format using Arrow + /// - Zero-copy data extraction + /// - Single optimized batch kernel launch + /// - Returns batched tensor (shape: [num_samples, 2^num_qubits]) /// /// Args: /// path: Path to Parquet file /// num_qubits: Number of qubits for encoding - /// encoding_method: Encoding strategy ("amplitude", "angle", or "basis") + /// encoding_method: Encoding strategy (currently only "amplitude") /// /// Returns: - /// QuantumTensor: DLPack-compatible tensor for zero-copy PyTorch integration - /// - /// Raises: - /// RuntimeError: If encoding fails + /// QuantumTensor: DLPack tensor containing all encoded states /// /// Example: /// >>> engine = QdpEngine(device_id=0) - /// >>> qtensor = engine.encode_from_parquet("data.parquet", num_qubits=2, encoding_method="amplitude") - /// >>> torch_tensor = torch.from_dlpack(qtensor) + /// >>> batched = engine.encode_from_parquet("data.parquet", 16, "amplitude") + /// >>> torch_tensor = torch.from_dlpack(batched) # Shape: [200, 65536] fn encode_from_parquet(&self, path: &str, num_qubits: usize, encoding_method: &str) -> PyResult<QuantumTensor> { let ptr = self.engine.encode_from_parquet(path, num_qubits, encoding_method) .map_err(|e| PyRuntimeError::new_err(format!("Encoding from parquet failed: {}", e)))?;
