rich7420 commented on code in PR #851: URL: https://github.com/apache/mahout/pull/851#discussion_r2702059648
########## qdp/qdp-core/src/encoding/basis.rs: ########## @@ -0,0 +1,144 @@ +// +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Basis encoding implementation. + +use std::ffi::c_void; + +use cudarc::driver::DevicePtr; +use qdp_kernels::launch_basis_encode_batch; + +use super::ChunkEncoder; +use crate::gpu::PipelineContext; +use crate::gpu::memory::PinnedHostBuffer; +use crate::{MahoutError, QdpEngine, Result}; + +/// Basis encoder has no persistent state. +pub(crate) struct BasisEncoderState; + +/// Basis encoding: maps integer indices to computational basis states. +pub(crate) struct BasisEncoder; + +impl ChunkEncoder for BasisEncoder { + type State = BasisEncoderState; + + fn validate_sample_size(&self, sample_size: usize) -> Result<()> { + if sample_size != 1 { + return Err(MahoutError::InvalidInput(format!( + "Basis encoding requires sample_size=1 (one index per sample), got {}", + sample_size + ))); + } + Ok(()) + } + + fn init_state( + &self, + _engine: &QdpEngine, + _sample_size: usize, + _num_qubits: usize, + ) -> Result<Self::State> { + Ok(BasisEncoderState) + } + + fn encode_chunk( + &self, + _state: &mut Self::State, + engine: &QdpEngine, + ctx: &PipelineContext, + host_buffer: &PinnedHostBuffer, + _dev_ptr: u64, + samples_in_chunk: usize, + _sample_size: usize, + state_ptr_offset: *mut c_void, + state_len: usize, + num_qubits: usize, + global_sample_offset: usize, + ) -> Result<()> { + unsafe { + crate::profile_scope!("GPU::BatchEncode"); Review Comment: It seems in this part per-chunk cpu allocations + per-chunk gpu allocations + sync H2D copies add significant overhead and reduce throughput. Maybe we could try reuse cpu/gpu buffer in this part ? ########## qdp/qdp-python/benchmark/benchmark_e2e.py: ########## @@ -330,13 +344,7 @@ def run_mahout_arrow(engine, n_qubits, n_samples, encoding_method: str = "amplit start_time = time.perf_counter() arrow_encode_start = time.perf_counter() - try: - qtensor = engine.encode(ARROW_FILE, n_qubits, encoding_method) - except RuntimeError as e: - if "Only amplitude encoding supported" in str(e): - print(" Basis encoding not supported for streaming from Arrow, skipping.") - return 0.0, None - raise Review Comment: In basis mode the Arrow IPC file is written as a scalar `Float64` column, but Mahout’s Arrow reader path only supports `List<Float64>` / `FixedSizeList<Float64>`. After removing this old skip guard, this may become a crash here. Maybe we could try to write basis Arrow IPC as `FixedSizeList<Float64>(len=1)` ########## qdp/qdp-core/src/encoding/mod.rs: ########## @@ -0,0 +1,309 @@ +// +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Streaming encoding implementations for different quantum encoding methods. + +mod amplitude; +mod basis; + +use std::ffi::c_void; +use std::sync::mpsc::{SyncSender, sync_channel}; +use std::thread; + +use cudarc::driver::DevicePtr; + +use crate::dlpack::DLManagedTensor; +use crate::gpu::PipelineContext; +use crate::gpu::memory::{GpuStateVector, PinnedHostBuffer}; +use crate::reader::StreamingDataReader; +use crate::{MahoutError, QdpEngine, Result}; + +/// 512MB staging buffer for large Parquet row groups (reduces fragmentation) +pub(crate) const STAGE_SIZE_BYTES: usize = 512 * 1024 * 1024; +pub(crate) const STAGE_SIZE_ELEMENTS: usize = STAGE_SIZE_BYTES / std::mem::size_of::<f64>(); + +pub(crate) type FullBufferResult = std::result::Result<(PinnedHostBuffer, usize), MahoutError>; +pub(crate) type FullBufferChannel = (SyncSender<FullBufferResult>, Receiver<FullBufferResult>); + +use std::sync::mpsc::Receiver; + +/// Trait for chunk-based quantum state encoding. +/// +/// Implementations provide the encoding-specific logic while the shared +/// streaming pipeline handles IO, buffering, and GPU memory management. +pub(crate) trait ChunkEncoder { + /// Encoder-specific state (e.g., norm buffer for amplitude encoding). + type State; + + /// Validate that the sample size is appropriate for this encoding method. + fn validate_sample_size(&self, sample_size: usize) -> Result<()>; + + /// Initialize encoder-specific state. + fn init_state( + &self, + engine: &QdpEngine, + sample_size: usize, + num_qubits: usize, + ) -> Result<Self::State>; + + /// Encode a chunk of samples to quantum states. + /// + /// # Arguments + /// * `state` - Encoder-specific state + /// * `engine` - QDP engine for GPU operations + /// * `ctx` - Pipeline context for async operations + /// * `host_buffer` - Pinned host buffer containing input data + /// * `dev_ptr` - Device pointer to staging buffer with copied data + /// * `samples_in_chunk` - Number of samples in this chunk + /// * `sample_size` - Size of each sample in f64 elements + /// * `state_ptr_offset` - Pointer to output location in state vector + /// * `state_len` - Length of each quantum state (2^num_qubits) + /// * `num_qubits` - Number of qubits + #[allow(clippy::too_many_arguments)] + fn encode_chunk( + &self, + state: &mut Self::State, + engine: &QdpEngine, + ctx: &PipelineContext, + host_buffer: &PinnedHostBuffer, + dev_ptr: u64, + samples_in_chunk: usize, + sample_size: usize, + state_ptr_offset: *mut c_void, + state_len: usize, + num_qubits: usize, + global_sample_offset: usize, + ) -> Result<()>; +} + +/// Shared streaming pipeline for encoding data from Parquet files. +/// +/// This function handles all the common IO, buffering, and GPU memory +/// management logic. The actual encoding is delegated to the `ChunkEncoder`. +pub(crate) fn stream_encode<E: ChunkEncoder>( + engine: &QdpEngine, + path: &str, + num_qubits: usize, + encoder: E, +) -> Result<*mut DLManagedTensor> { + // Initialize reader + let mut reader_core = crate::io::ParquetBlockReader::new(path, None)?; + let num_samples = reader_core.total_rows; + + // Allocate output state vector + let total_state_vector = GpuStateVector::new_batch(&engine.device, num_samples, num_qubits)?; + const PIPELINE_EVENT_SLOTS: usize = 2; + let ctx = PipelineContext::new(&engine.device, PIPELINE_EVENT_SLOTS)?; + + // Double-buffered device staging + let dev_in_a = unsafe { engine.device.alloc::<f64>(STAGE_SIZE_ELEMENTS) } + .map_err(|e| MahoutError::MemoryAllocation(format!("{:?}", e)))?; + let dev_in_b = unsafe { engine.device.alloc::<f64>(STAGE_SIZE_ELEMENTS) } + .map_err(|e| MahoutError::MemoryAllocation(format!("{:?}", e)))?; + + // Channel setup for async IO + let (full_buf_tx, full_buf_rx): FullBufferChannel = sync_channel(2); + let (empty_buf_tx, empty_buf_rx): (SyncSender<PinnedHostBuffer>, _) = sync_channel(2); + + // Read first chunk to determine sample size + let mut host_buf_first = PinnedHostBuffer::new(STAGE_SIZE_ELEMENTS)?; + let first_len = reader_core.read_chunk(host_buf_first.as_slice_mut())?; + + let sample_size = reader_core + .get_sample_size() + .ok_or_else(|| MahoutError::InvalidInput("Could not determine sample size".into()))?; + + // Validate sample size for this encoder + encoder.validate_sample_size(sample_size)?; + + // Initialize encoder-specific state + let mut encoder_state = encoder.init_state(engine, sample_size, num_qubits)?; + + let state_len = 1 << num_qubits; + + // Send first buffer to processing + full_buf_tx + .send(Ok((host_buf_first, first_len))) + .map_err(|_| MahoutError::Io("Failed to send first buffer".into()))?; + + // Send second empty buffer for IO thread + empty_buf_tx + .send(PinnedHostBuffer::new(STAGE_SIZE_ELEMENTS)?) + .map_err(|_| MahoutError::Io("Failed to send second buffer".into()))?; + + // Spawn IO thread + let mut reader = reader_core; + let io_handle = thread::spawn(move || { + loop { + let mut buffer = match empty_buf_rx.recv() { + Ok(b) => b, + Err(_) => break, + }; + + let result = reader + .read_chunk(buffer.as_slice_mut()) + .map(|len| (buffer, len)); + + let should_break = match &result { + Ok((_, len)) => *len == 0, + Err(_) => true, + }; + + if full_buf_tx.send(result).is_err() { + break; + } + + if should_break { + break; + } + } + }); + + // Main processing loop + let mut global_sample_offset: usize = 0; + let mut use_dev_a = true; + + loop { + let (host_buffer, current_len) = match full_buf_rx.recv() { + Ok(Ok((buffer, len))) => (buffer, len), + Ok(Err(e)) => return Err(e), + Err(_) => return Err(MahoutError::Io("IO thread disconnected".into())), + }; + + if current_len == 0 { + break; + } + + if current_len % sample_size != 0 { + return Err(MahoutError::InvalidInput(format!( + "Chunk length {} is not a multiple of sample size {}", + current_len, sample_size + ))); + } + + let samples_in_chunk = current_len / sample_size; + if samples_in_chunk > 0 { + let event_slot = if use_dev_a { 0 } else { 1 }; + let dev_ptr = if use_dev_a { + *dev_in_a.device_ptr() + } else { + *dev_in_b.device_ptr() + }; + + unsafe { + crate::profile_scope!("GPU::Dispatch"); + + // Async copy to device + ctx.async_copy_to_device( + host_buffer.ptr() as *const c_void, + dev_ptr as *mut c_void, + current_len, + )?; + ctx.record_copy_done(event_slot)?; + ctx.wait_for_copy(event_slot)?; Review Comment: It seems in `stream_encode()` always performs an H2D staging copy (`async_copy_to_device` + wait) for every chunk, but `BasisEncoder::encode_chunk()` does not use `dev_ptr` at all. It costs basis streaming pays an unnecessary H2D memcpy + synchronization per chunk. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
