This is an automated email from the ASF dual-hosted git repository. guanmingchiu pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/mahout.git
commit 36b1872d0455ee6fe39cd93c87cda9707cda3704 Author: KUAN-HAO HUANG <[email protected]> AuthorDate: Thu Dec 18 16:35:53 2025 +0800 [QDP] improve memory management (#708) * DataLoader-Benchmark Signed-off-by: 400Ping <[email protected]> * update output Signed-off-by: 400Ping <[email protected]> * update Signed-off-by: 400Ping <[email protected]> * [Fix] Remove print in Qiskit Signed-off-by: 400Ping <[email protected]> * fix Signed-off-by: 400Ping <[email protected]> * fix pre-commit Signed-off-by: 400Ping <[email protected]> * [QDP] improve memory management * fix pre-commit Signed-off-by: 400Ping <[email protected]> * update doc Signed-off-by: 400Ping <[email protected]> * update doc Signed-off-by: 400Ping <[email protected]> * improve memory usage * improve pool * improve for memory * fix back * merge and improve * fix error * memory release * follow suggestions * Address reviewer feedback: safety checks, error handling, and code cleanup * Revert e2e Benchmark * refactor ffi * remove redundant check * fix error --------- Signed-off-by: 400Ping <[email protected]> Co-authored-by: 400Ping <[email protected]> --- qdp/qdp-core/src/gpu/cuda_ffi.rs | 52 ++++++ qdp/qdp-core/src/gpu/encodings/amplitude.rs | 15 +- qdp/qdp-core/src/gpu/memory.rs | 87 +++++++++- qdp/qdp-core/src/gpu/mod.rs | 6 + qdp/qdp-core/src/gpu/pipeline.rs | 102 ++++++++++-- qdp/qdp-core/src/io.rs | 241 ++++++++++++++++++++++++++++ qdp/qdp-core/src/lib.rs | 205 +++++++++++++++++++++-- qdp/qdp-core/tests/arrow_ipc_io.rs | 4 +- qdp/qdp-core/tests/common/mod.rs | 3 +- 9 files changed, 666 insertions(+), 49 deletions(-) diff --git a/qdp/qdp-core/src/gpu/cuda_ffi.rs b/qdp/qdp-core/src/gpu/cuda_ffi.rs new file mode 100644 index 000000000..b61b4e4b2 --- /dev/null +++ b/qdp/qdp-core/src/gpu/cuda_ffi.rs @@ -0,0 +1,52 @@ +// +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Centralized CUDA Runtime API FFI declarations. + +#![cfg(target_os = "linux")] + +use std::ffi::c_void; + +pub(crate) const CUDA_MEMCPY_HOST_TO_DEVICE: u32 = 1; +pub(crate) const CUDA_EVENT_DISABLE_TIMING: u32 = 0x02; + +unsafe extern "C" { + pub(crate) fn cudaHostAlloc(pHost: *mut *mut c_void, size: usize, flags: u32) -> i32; + pub(crate) fn cudaFreeHost(ptr: *mut c_void) -> i32; + + pub(crate) fn cudaMemGetInfo(free: *mut usize, total: *mut usize) -> i32; + + pub(crate) fn cudaMemcpyAsync( + dst: *mut c_void, + src: *const c_void, + count: usize, + kind: u32, + stream: *mut c_void, + ) -> i32; + + pub(crate) fn cudaEventCreateWithFlags(event: *mut *mut c_void, flags: u32) -> i32; + pub(crate) fn cudaEventRecord(event: *mut c_void, stream: *mut c_void) -> i32; + pub(crate) fn cudaEventDestroy(event: *mut c_void) -> i32; + pub(crate) fn cudaStreamWaitEvent(stream: *mut c_void, event: *mut c_void, flags: u32) -> i32; + pub(crate) fn cudaStreamSynchronize(stream: *mut c_void) -> i32; + + pub(crate) fn cudaMemsetAsync( + devPtr: *mut c_void, + value: i32, + count: usize, + stream: *mut c_void, + ) -> i32; +} diff --git a/qdp/qdp-core/src/gpu/encodings/amplitude.rs b/qdp/qdp-core/src/gpu/encodings/amplitude.rs index 715f0984e..70b38895e 100644 --- a/qdp/qdp-core/src/gpu/encodings/amplitude.rs +++ b/qdp/qdp-core/src/gpu/encodings/amplitude.rs @@ -14,7 +14,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -// Amplitude encoding: direct state injection with L2 normalization +// Amplitude encoding: state injection with L2 normalization use std::sync::Arc; @@ -27,6 +27,8 @@ use super::QuantumEncoder; #[cfg(target_os = "linux")] use std::ffi::c_void; #[cfg(target_os = "linux")] +use crate::gpu::cuda_ffi::cudaMemsetAsync; +#[cfg(target_os = "linux")] use cudarc::driver::{DevicePtr, DevicePtrMut}; #[cfg(target_os = "linux")] use qdp_kernels::{ @@ -279,7 +281,7 @@ impl QuantumEncoder for AmplitudeEncoder { impl AmplitudeEncoder { - /// Async pipeline encoding for large data (SSS-tier optimization) + /// Async pipeline encoding for large data /// /// Uses the generic dual-stream pipeline infrastructure to overlap /// data transfer and computation. The pipeline handles all the @@ -359,15 +361,6 @@ impl AmplitudeEncoder { // Zero-fill padding region using CUDA Runtime API // Use default stream since pipeline streams are already synchronized unsafe { - unsafe extern "C" { - fn cudaMemsetAsync( - devPtr: *mut c_void, - value: i32, - count: usize, - stream: *mut c_void, - ) -> i32; - } - let result = cudaMemsetAsync( tail_ptr, 0, diff --git a/qdp/qdp-core/src/gpu/memory.rs b/qdp/qdp-core/src/gpu/memory.rs index 72bf5bc7f..26e7b1383 100644 --- a/qdp/qdp-core/src/gpu/memory.rs +++ b/qdp/qdp-core/src/gpu/memory.rs @@ -26,6 +26,9 @@ pub enum Precision { Float64, } +#[cfg(target_os = "linux")] +use crate::gpu::cuda_ffi::{cudaFreeHost, cudaHostAlloc, cudaMemGetInfo}; + #[cfg(target_os = "linux")] fn bytes_to_mib(bytes: usize) -> f64 { bytes as f64 / (1024.0 * 1024.0) @@ -45,10 +48,6 @@ fn cuda_error_to_string(code: i32) -> &'static str { #[cfg(target_os = "linux")] fn query_cuda_mem_info() -> Result<(usize, usize)> { unsafe { - unsafe extern "C" { - fn cudaMemGetInfo(free: *mut usize, total: *mut usize) -> i32; - } - let mut free_bytes: usize = 0; let mut total_bytes: usize = 0; let result = cudaMemGetInfo(&mut free_bytes as *mut usize, &mut total_bytes as *mut usize); @@ -379,3 +378,83 @@ impl GpuStateVector { } } } + +// === Pinned Memory Implementation === + +/// Pinned Host Memory Buffer (Page-Locked) +/// +/// Enables DMA for H2D copies, doubling bandwidth and reducing CPU usage. +#[cfg(target_os = "linux")] +pub struct PinnedBuffer { + ptr: *mut f64, + size_elements: usize, +} + +#[cfg(target_os = "linux")] +impl PinnedBuffer { + /// Allocate pinned memory + pub fn new(elements: usize) -> Result<Self> { + unsafe { + let bytes = elements + .checked_mul(std::mem::size_of::<f64>()) + .ok_or_else(|| MahoutError::MemoryAllocation( + format!("Requested pinned buffer allocation size overflow (elements={})", elements) + ))?; + let mut ptr: *mut c_void = std::ptr::null_mut(); + + let ret = cudaHostAlloc(&mut ptr, bytes, 0); // cudaHostAllocDefault + + if ret != 0 { + return Err(MahoutError::MemoryAllocation( + format!("cudaHostAlloc failed with error code: {}", ret) + )); + } + + Ok(Self { + ptr: ptr as *mut f64, + size_elements: elements, + }) + } + } + + /// Get mutable slice to write data into + pub fn as_slice_mut(&mut self) -> &mut [f64] { + unsafe { std::slice::from_raw_parts_mut(self.ptr, self.size_elements) } + } + + /// Get raw pointer for CUDA memcpy + pub fn ptr(&self) -> *const f64 { + self.ptr + } + + pub fn len(&self) -> usize { + self.size_elements + } + + pub fn is_empty(&self) -> bool { + self.size_elements == 0 + } +} + +#[cfg(target_os = "linux")] +impl Drop for PinnedBuffer { + fn drop(&mut self) { + unsafe { + let result = cudaFreeHost(self.ptr as *mut c_void); + if result != 0 { + eprintln!( + "Warning: cudaFreeHost failed with error code {} ({})", + result, + cuda_error_to_string(result) + ); + } + } + } +} + +// Safety: Pinned memory is accessible from any thread +#[cfg(target_os = "linux")] +unsafe impl Send for PinnedBuffer {} + +#[cfg(target_os = "linux")] +unsafe impl Sync for PinnedBuffer {} diff --git a/qdp/qdp-core/src/gpu/mod.rs b/qdp/qdp-core/src/gpu/mod.rs index fe7cdace0..77b41f6e8 100644 --- a/qdp/qdp-core/src/gpu/mod.rs +++ b/qdp/qdp-core/src/gpu/mod.rs @@ -18,6 +18,12 @@ pub mod memory; pub mod encodings; pub mod pipeline; +#[cfg(target_os = "linux")] +pub(crate) mod cuda_ffi; + pub use memory::GpuStateVector; pub use encodings::{QuantumEncoder, AmplitudeEncoder, AngleEncoder, BasisEncoder, get_encoder}; pub use pipeline::run_dual_stream_pipeline; + +#[cfg(target_os = "linux")] +pub use pipeline::PipelineContext; diff --git a/qdp/qdp-core/src/gpu/pipeline.rs b/qdp/qdp-core/src/gpu/pipeline.rs index 3c5921c38..bbe74c7b0 100644 --- a/qdp/qdp-core/src/gpu/pipeline.rs +++ b/qdp/qdp-core/src/gpu/pipeline.rs @@ -24,7 +24,94 @@ use std::ffi::c_void; use cudarc::driver::{CudaDevice, CudaSlice, DevicePtr, safe::CudaStream}; use crate::error::{MahoutError, Result}; #[cfg(target_os = "linux")] -use crate::gpu::memory::{ensure_device_memory_available, map_allocation_error}; +use crate::gpu::memory::{ensure_device_memory_available, map_allocation_error, PinnedBuffer}; + +#[cfg(target_os = "linux")] +use crate::gpu::cuda_ffi::{ + cudaEventCreateWithFlags, cudaEventDestroy, cudaEventRecord, cudaMemcpyAsync, cudaStreamSynchronize, + cudaStreamWaitEvent, CUDA_EVENT_DISABLE_TIMING, CUDA_MEMCPY_HOST_TO_DEVICE, +}; + +/// Dual-stream pipeline context: manages compute/copy streams and sync events +#[cfg(target_os = "linux")] +pub struct PipelineContext { + pub stream_compute: CudaStream, + pub stream_copy: CudaStream, + event_copy_done: *mut c_void, +} + +#[cfg(target_os = "linux")] +impl PipelineContext { + /// Create dual streams and sync event + pub fn new(device: &Arc<CudaDevice>) -> Result<Self> { + let stream_compute = device.fork_default_stream() + .map_err(|e| MahoutError::Cuda(format!("{:?}", e)))?; + let stream_copy = device.fork_default_stream() + .map_err(|e| MahoutError::Cuda(format!("{:?}", e)))?; + + let mut event_copy_done: *mut c_void = std::ptr::null_mut(); + unsafe { + let ret = cudaEventCreateWithFlags(&mut event_copy_done, CUDA_EVENT_DISABLE_TIMING); + if ret != 0 { + return Err(MahoutError::Cuda(format!("Failed to create CUDA event: {}", ret))); + } + } + + Ok(Self { + stream_compute, + stream_copy, + event_copy_done, + }) + } + + /// Async H2D copy on copy stream + pub unsafe fn async_copy_to_device(&self, src: &PinnedBuffer, dst: *mut c_void, len_elements: usize) { + crate::profile_scope!("GPU::H2D_Copy"); + unsafe { + cudaMemcpyAsync( + dst, + src.ptr() as *const c_void, + len_elements * std::mem::size_of::<f64>(), + CUDA_MEMCPY_HOST_TO_DEVICE, + self.stream_copy.stream as *mut c_void + ); + } + } + + /// Record copy completion event + pub unsafe fn record_copy_done(&self) { + unsafe { + cudaEventRecord(self.event_copy_done, self.stream_copy.stream as *mut c_void); + } + } + + /// Make compute stream wait for copy completion + pub unsafe fn wait_for_copy(&self) { + crate::profile_scope!("GPU::StreamWait"); + unsafe { + cudaStreamWaitEvent(self.stream_compute.stream as *mut c_void, self.event_copy_done, 0); + } + } + + /// Sync copy stream (safe to reuse host buffer) + pub unsafe fn sync_copy_stream(&self) { + crate::profile_scope!("Pipeline::SyncCopy"); + unsafe { + cudaStreamSynchronize(self.stream_copy.stream as *mut c_void); + } + } +} + +#[cfg(target_os = "linux")] +impl Drop for PipelineContext { + fn drop(&mut self) { + unsafe { + if !self.event_copy_done.is_null() { + cudaEventDestroy(self.event_copy_done); + } + } + } +} /// Chunk processing callback for async pipeline /// @@ -112,24 +199,11 @@ where { crate::profile_scope!("GPU::H2DCopyAsync"); unsafe { - unsafe extern "C" { - fn cudaMemcpyAsync( - dst: *mut c_void, - src: *const c_void, - count: usize, - kind: u32, - stream: *mut c_void, - ) -> i32; - } - let dst_device_ptr = *input_chunk_dev.device_ptr() as *mut c_void; let src_host_ptr = chunk.as_ptr() as *const c_void; let bytes = chunk.len() * std::mem::size_of::<f64>(); let stream_handle = current_stream.stream as *mut c_void; - // cudaMemcpyKind: cudaMemcpyHostToDevice = 1 - const CUDA_MEMCPY_HOST_TO_DEVICE: u32 = 1; - let result = cudaMemcpyAsync( dst_device_ptr, src_host_ptr, diff --git a/qdp/qdp-core/src/io.rs b/qdp/qdp-core/src/io.rs index 372f4ef75..e7b253626 100644 --- a/qdp/qdp-core/src/io.rs +++ b/qdp/qdp-core/src/io.rs @@ -438,3 +438,244 @@ pub fn read_arrow_ipc_batch<P: AsRef<Path>>(path: P) -> Result<(Vec<f64>, usize, Ok((all_data, num_samples, sample_size)) } + +/// Streaming Parquet reader for List<Float64> and FixedSizeList<Float64> columns +/// +/// Reads Parquet files in chunks without loading entire file into memory. +/// Supports efficient streaming for large files via Producer-Consumer pattern. +pub struct ParquetBlockReader { + reader: parquet::arrow::arrow_reader::ParquetRecordBatchReader, + sample_size: Option<usize>, + leftover_data: Vec<f64>, + leftover_cursor: usize, + pub total_rows: usize, +} + +impl ParquetBlockReader { + /// Create a new streaming Parquet reader + /// + /// # Arguments + /// * `path` - Path to the Parquet file + /// * `batch_size` - Optional batch size (defaults to 2048) + pub fn new<P: AsRef<Path>>(path: P, batch_size: Option<usize>) -> Result<Self> { + let file = File::open(path.as_ref()).map_err(|e| { + MahoutError::Io(format!("Failed to open Parquet file: {}", e)) + })?; + + let builder = ParquetRecordBatchReaderBuilder::try_new(file).map_err(|e| { + MahoutError::Io(format!("Failed to create Parquet reader: {}", e)) + })?; + + let schema = builder.schema(); + if schema.fields().len() != 1 { + return Err(MahoutError::InvalidInput(format!( + "Expected exactly one column, got {}", + schema.fields().len() + ))); + } + + let field = &schema.fields()[0]; + match field.data_type() { + DataType::List(child_field) => { + if !matches!(child_field.data_type(), DataType::Float64) { + return Err(MahoutError::InvalidInput(format!( + "Expected List<Float64> column, got List<{:?}>", + child_field.data_type() + ))); + } + } + DataType::FixedSizeList(child_field, _) => { + if !matches!(child_field.data_type(), DataType::Float64) { + return Err(MahoutError::InvalidInput(format!( + "Expected FixedSizeList<Float64> column, got FixedSizeList<{:?}>", + child_field.data_type() + ))); + } + } + _ => { + return Err(MahoutError::InvalidInput(format!( + "Expected List<Float64> or FixedSizeList<Float64> column, got {:?}", + field.data_type() + ))); + } + } + + let total_rows = builder.metadata().file_metadata().num_rows() as usize; + + let batch_size = batch_size.unwrap_or(2048); + let reader = builder + .with_batch_size(batch_size) + .build() + .map_err(|e| { + MahoutError::Io(format!("Failed to build Parquet reader: {}", e)) + })?; + + Ok(Self { + reader, + sample_size: None, + leftover_data: Vec::new(), + leftover_cursor: 0, + total_rows, + }) + } + + /// Get the sample size (number of elements per sample) + pub fn get_sample_size(&self) -> Option<usize> { + self.sample_size + } + + /// Read a chunk of data into the provided buffer + /// + /// Handles leftover data from previous reads and ensures sample boundaries are respected. + /// Returns the number of elements written to the buffer. + pub fn read_chunk(&mut self, buffer: &mut [f64]) -> Result<usize> { + let mut written = 0; + let buf_cap = buffer.len(); + let calc_limit = |ss: usize| -> usize { + if ss == 0 { + buf_cap + } else { + (buf_cap / ss) * ss + } + }; + let mut limit = self.sample_size.map_or(buf_cap, calc_limit); + + if self.sample_size.is_some() { + while self.leftover_cursor < self.leftover_data.len() && written < limit { + let available = self.leftover_data.len() - self.leftover_cursor; + let space_left = limit - written; + let to_copy = std::cmp::min(available, space_left); + + if to_copy > 0 { + buffer[written..written+to_copy].copy_from_slice( + &self.leftover_data[self.leftover_cursor..self.leftover_cursor+to_copy] + ); + written += to_copy; + self.leftover_cursor += to_copy; + + if self.leftover_cursor == self.leftover_data.len() { + self.leftover_data.clear(); + self.leftover_cursor = 0; + break; + } + } else { + break; + } + } + } + + while written < limit { + match self.reader.next() { + Some(Ok(batch)) => { + if batch.num_columns() == 0 { + continue; + } + let column = batch.column(0); + + let (current_sample_size, batch_values) = match column.data_type() { + DataType::List(_) => { + let list_array = column + .as_any() + .downcast_ref::<ListArray>() + .ok_or_else(|| MahoutError::Io("Failed to downcast to ListArray".to_string()))?; + + if list_array.len() == 0 { + continue; + } + + let mut batch_values = Vec::new(); + let mut current_sample_size = None; + for i in 0..list_array.len() { + let value_array = list_array.value(i); + let float_array = value_array + .as_any() + .downcast_ref::<Float64Array>() + .ok_or_else(|| MahoutError::Io("List values must be Float64".to_string()))?; + + if i == 0 { + current_sample_size = Some(float_array.len()); + } + + if float_array.null_count() == 0 { + batch_values.extend_from_slice(float_array.values()); + } else { + return Err(MahoutError::Io("Null value encountered in Float64Array during quantum encoding. Please check data quality at the source.".to_string())); + } + } + + (current_sample_size.expect("list_array.len() > 0 ensures at least one element"), batch_values) + } + DataType::FixedSizeList(_, size) => { + let list_array = column + .as_any() + .downcast_ref::<FixedSizeListArray>() + .ok_or_else(|| MahoutError::Io("Failed to downcast to FixedSizeListArray".to_string()))?; + + if list_array.len() == 0 { + continue; + } + + let current_sample_size = *size as usize; + + let values = list_array.values(); + let float_array = values + .as_any() + .downcast_ref::<Float64Array>() + .ok_or_else(|| MahoutError::Io("FixedSizeList values must be Float64".to_string()))?; + + let mut batch_values = Vec::new(); + if float_array.null_count() == 0 { + batch_values.extend_from_slice(float_array.values()); + } else { + return Err(MahoutError::Io("Null value encountered in Float64Array during quantum encoding. Please check data quality at the source.".to_string())); + } + + (current_sample_size, batch_values) + } + _ => { + return Err(MahoutError::Io(format!( + "Expected List<Float64> or FixedSizeList<Float64>, got {:?}", + column.data_type() + ))); + } + }; + + if self.sample_size.is_none() { + self.sample_size = Some(current_sample_size); + limit = calc_limit(current_sample_size); + } else { + if let Some(expected_size) = self.sample_size { + if current_sample_size != expected_size { + return Err(MahoutError::InvalidInput(format!( + "Inconsistent sample sizes: expected {}, got {}", + expected_size, current_sample_size + ))); + } + } + } + + let available = batch_values.len(); + let space_left = limit - written; + + if available <= space_left { + buffer[written..written+available].copy_from_slice(&batch_values); + written += available; + } else { + if space_left > 0 { + buffer[written..written+space_left].copy_from_slice(&batch_values[0..space_left]); + written += space_left; + } + self.leftover_data.clear(); + self.leftover_data.extend_from_slice(&batch_values[space_left..]); + self.leftover_cursor = 0; + break; + } + }, + Some(Err(e)) => return Err(MahoutError::Io(format!("Parquet read error: {}", e))), + None => break, + } + } + + Ok(written) + } +} diff --git a/qdp/qdp-core/src/lib.rs b/qdp/qdp-core/src/lib.rs index e14e2bab0..429813c26 100644 --- a/qdp/qdp-core/src/lib.rs +++ b/qdp/qdp-core/src/lib.rs @@ -26,10 +26,28 @@ pub use error::{MahoutError, Result}; pub use gpu::memory::Precision; use std::sync::Arc; +#[cfg(target_os = "linux")] +use std::ffi::c_void; +#[cfg(target_os = "linux")] +use std::sync::mpsc::{sync_channel, Receiver, SyncSender}; +#[cfg(target_os = "linux")] +use std::thread; -use cudarc::driver::CudaDevice; +use cudarc::driver::{CudaDevice, DevicePtr, DevicePtrMut}; use crate::dlpack::DLManagedTensor; use crate::gpu::get_encoder; +#[cfg(target_os = "linux")] +use crate::gpu::memory::{PinnedBuffer, GpuStateVector}; +#[cfg(target_os = "linux")] +use crate::gpu::PipelineContext; +#[cfg(target_os = "linux")] +use qdp_kernels::{launch_l2_norm_batch, launch_amplitude_encode_batch}; + +/// 512MB staging buffer for large Parquet row groups (reduces fragmentation) +#[cfg(target_os = "linux")] +const STAGE_SIZE_BYTES: usize = 512 * 1024 * 1024; +#[cfg(target_os = "linux")] +const STAGE_SIZE_ELEMENTS: usize = STAGE_SIZE_BYTES / std::mem::size_of::<f64>(); /// Main entry point for Mahout QDP /// @@ -134,18 +152,18 @@ impl QdpEngine { Ok(dlpack_ptr) } - /// Load data from Parquet file and encode into quantum state + /// Streaming Parquet encoder with multi-threaded IO /// - /// Reads Parquet file with List<Float64> column format and encodes all samples - /// in a single batch operation. Bypasses pandas for maximum performance. + /// Uses Producer-Consumer pattern: IO thread reads Parquet while GPU processes data. + /// Double-buffered (ping-pong) for maximum pipeline overlap. /// /// # Arguments - /// * `path` - Path to Parquet file + /// * `path` - Path to Parquet file with List<Float64> column /// * `num_qubits` - Number of qubits - /// * `encoding_method` - Strategy: "amplitude", "angle", or "basis" + /// * `encoding_method` - Currently only "amplitude" supported for streaming /// /// # Returns - /// Single DLPack pointer containing all encoded states (shape: [num_samples, 2^num_qubits]) + /// DLPack pointer to encoded states [num_samples, 2^num_qubits] pub fn encode_from_parquet( &self, path: &str, @@ -154,14 +172,171 @@ impl QdpEngine { ) -> Result<*mut DLManagedTensor> { crate::profile_scope!("Mahout::EncodeFromParquet"); - // Read Parquet directly using Arrow (faster than pandas) - let (batch_data, num_samples, sample_size) = { - crate::profile_scope!("IO::ReadParquetBatch"); - crate::io::read_parquet_batch(path)? - }; + #[cfg(target_os = "linux")] + { + if encoding_method != "amplitude" { + return Err(MahoutError::NotImplemented("Only amplitude encoding supported for streaming".into())); + } - // Encode using fused batch kernel - self.encode_batch(&batch_data, num_samples, sample_size, num_qubits, encoding_method) + let mut reader_core = crate::io::ParquetBlockReader::new(path, None)?; + let num_samples = reader_core.total_rows; + + let total_state_vector = GpuStateVector::new_batch(&self.device, num_samples, num_qubits)?; + let ctx = PipelineContext::new(&self.device)?; + + let dev_in_a = unsafe { self.device.alloc::<f64>(STAGE_SIZE_ELEMENTS) } + .map_err(|e| MahoutError::MemoryAllocation(format!("{:?}", e)))?; + let dev_in_b = unsafe { self.device.alloc::<f64>(STAGE_SIZE_ELEMENTS) } + .map_err(|e| MahoutError::MemoryAllocation(format!("{:?}", e)))?; + + let (full_buf_tx, full_buf_rx): (SyncSender<std::result::Result<(PinnedBuffer, usize), MahoutError>>, Receiver<std::result::Result<(PinnedBuffer, usize), MahoutError>>) = sync_channel(2); + let (empty_buf_tx, empty_buf_rx): (SyncSender<PinnedBuffer>, Receiver<PinnedBuffer>) = sync_channel(2); + + let mut host_buf_first = PinnedBuffer::new(STAGE_SIZE_ELEMENTS)?; + let first_len = reader_core.read_chunk(host_buf_first.as_slice_mut())?; + + let sample_size = reader_core.get_sample_size() + .ok_or_else(|| MahoutError::InvalidInput("Could not determine sample size".into()))?; + + if sample_size == 0 { + return Err(MahoutError::InvalidInput("Sample size cannot be zero".into())); + } + + full_buf_tx.send(Ok((host_buf_first, first_len))) + .map_err(|_| MahoutError::Io("Failed to send first buffer".into()))?; + + empty_buf_tx.send(PinnedBuffer::new(STAGE_SIZE_ELEMENTS)?) + .map_err(|_| MahoutError::Io("Failed to send second buffer".into()))?; + + let mut reader = reader_core; + let io_handle = thread::spawn(move || { + loop { + let mut buffer = match empty_buf_rx.recv() { + Ok(b) => b, + Err(_) => break, + }; + + let result = reader.read_chunk(buffer.as_slice_mut()).map(|len| (buffer, len)); + + let should_break = match &result { + Ok((_, len)) => *len == 0, + Err(_) => true, + }; + + if full_buf_tx.send(result).is_err() { break; } + + if should_break { break; } + } + }); + + let mut global_sample_offset: usize = 0; + let mut use_dev_a = true; + let state_len_per_sample = 1 << num_qubits; + + loop { + let (host_buffer, current_len) = match full_buf_rx.recv() { + Ok(Ok((buffer, len))) => (buffer, len), + Ok(Err(e)) => return Err(e), + Err(_) => return Err(MahoutError::Io("IO thread disconnected".into())), + }; + + if current_len == 0 { break; } + + if current_len % sample_size != 0 { + return Err(MahoutError::InvalidInput(format!( + "Chunk length {} is not a multiple of sample size {}", + current_len, sample_size + ))); + } + + let samples_in_chunk = current_len / sample_size; + if samples_in_chunk > 0 { + let dev_ptr = if use_dev_a { *dev_in_a.device_ptr() } else { *dev_in_b.device_ptr() }; + + unsafe { + crate::profile_scope!("GPU::Dispatch"); + + ctx.async_copy_to_device(&host_buffer, dev_ptr as *mut c_void, current_len); + ctx.record_copy_done(); + ctx.wait_for_copy(); + + { + crate::profile_scope!("GPU::BatchEncode"); + let offset_elements = global_sample_offset + .checked_mul(state_len_per_sample) + .ok_or_else(|| MahoutError::MemoryAllocation( + format!("Offset calculation overflow: {} * {}", global_sample_offset, state_len_per_sample) + ))?; + + let offset_bytes = offset_elements + .checked_mul(std::mem::size_of::<qdp_kernels::CuDoubleComplex>()) + .ok_or_else(|| MahoutError::MemoryAllocation( + format!("Offset bytes calculation overflow: {} * {}", offset_elements, std::mem::size_of::<qdp_kernels::CuDoubleComplex>()) + ))?; + + let state_ptr_offset = total_state_vector.ptr_void().cast::<u8>() + .add(offset_bytes) + .cast::<std::ffi::c_void>(); + + let mut norm_buffer = self.device.alloc_zeros::<f64>(samples_in_chunk) + .map_err(|e| MahoutError::MemoryAllocation(format!("Failed to allocate norm buffer: {:?}", e)))?; + + { + crate::profile_scope!("GPU::NormBatch"); + let ret = launch_l2_norm_batch( + dev_ptr as *const f64, + samples_in_chunk, + sample_size, + *norm_buffer.device_ptr_mut() as *mut f64, + ctx.stream_compute.stream as *mut c_void + ); + if ret != 0 { + return Err(MahoutError::KernelLaunch(format!("Norm kernel error: {}", ret))); + } + } + + { + crate::profile_scope!("GPU::EncodeBatch"); + let ret = launch_amplitude_encode_batch( + dev_ptr as *const f64, + state_ptr_offset, + *norm_buffer.device_ptr() as *const f64, + samples_in_chunk, + sample_size, + state_len_per_sample, + ctx.stream_compute.stream as *mut c_void + ); + if ret != 0 { + return Err(MahoutError::KernelLaunch(format!("Encode kernel error: {}", ret))); + } + } + } + + ctx.sync_copy_stream(); + } + global_sample_offset = global_sample_offset + .checked_add(samples_in_chunk) + .ok_or_else(|| MahoutError::MemoryAllocation( + format!("Sample offset overflow: {} + {}", global_sample_offset, samples_in_chunk) + ))?; + use_dev_a = !use_dev_a; + } + + let _ = empty_buf_tx.send(host_buffer); + } + + self.device.synchronize().map_err(|e| MahoutError::Cuda(format!("{:?}", e)))?; + io_handle.join().map_err(|e| MahoutError::Io(format!("IO thread panicked: {:?}", e)))?; + + let dlpack_ptr = total_state_vector.to_dlpack(); + Ok(dlpack_ptr) + } + + #[cfg(not(target_os = "linux"))] + { + let (batch_data, num_samples, sample_size) = crate::io::read_parquet_batch(path)?; + self.encode_batch(&batch_data, num_samples, sample_size, num_qubits, encoding_method) + } } /// Load data from Arrow IPC file and encode into quantum state @@ -185,13 +360,11 @@ impl QdpEngine { ) -> Result<*mut DLManagedTensor> { crate::profile_scope!("Mahout::EncodeFromArrowIPC"); - // Read Arrow IPC (6x faster than Parquet) let (batch_data, num_samples, sample_size) = { crate::profile_scope!("IO::ReadArrowIPCBatch"); crate::io::read_arrow_ipc_batch(path)? }; - // Encode using fused batch kernel self.encode_batch(&batch_data, num_samples, sample_size, num_qubits, encoding_method) } } diff --git a/qdp/qdp-core/tests/arrow_ipc_io.rs b/qdp/qdp-core/tests/arrow_ipc_io.rs index 6ef206954..1a9289c08 100644 --- a/qdp/qdp-core/tests/arrow_ipc_io.rs +++ b/qdp/qdp-core/tests/arrow_ipc_io.rs @@ -14,15 +14,13 @@ // See the License for the specific language governing permissions and // limitations under the License. -use qdp_core::io::{read_arrow_ipc_batch, read_parquet_batch}; +use qdp_core::io::read_arrow_ipc_batch; use arrow::array::{Float64Array, FixedSizeListArray}; use arrow::datatypes::{DataType, Field, Schema}; use arrow::ipc::writer::FileWriter as ArrowFileWriter; use std::fs::{self, File}; use std::sync::Arc; -mod common; - #[test] fn test_read_arrow_ipc_fixed_size_list() { let temp_path = "/tmp/test_arrow_ipc_fixed.arrow"; diff --git a/qdp/qdp-core/tests/common/mod.rs b/qdp/qdp-core/tests/common/mod.rs index f105a5436..9afb31e40 100644 --- a/qdp/qdp-core/tests/common/mod.rs +++ b/qdp/qdp-core/tests/common/mod.rs @@ -14,7 +14,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -/// Create test data with normalized values +/// Creates normalized test data +#[allow(dead_code)] // Used by multiple test modules pub fn create_test_data(size: usize) -> Vec<f64> { (0..size).map(|i| (i as f64) / (size as f64)).collect() }
