This is an automated email from the ASF dual-hosted git repository.

guanmingchiu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/mahout.git

commit 36b1872d0455ee6fe39cd93c87cda9707cda3704
Author: KUAN-HAO HUANG <[email protected]>
AuthorDate: Thu Dec 18 16:35:53 2025 +0800

    [QDP] improve memory management (#708)
    
    * DataLoader-Benchmark
    
    Signed-off-by: 400Ping <[email protected]>
    
    * update output
    
    Signed-off-by: 400Ping <[email protected]>
    
    * update
    
    Signed-off-by: 400Ping <[email protected]>
    
    * [Fix] Remove print in Qiskit
    
    Signed-off-by: 400Ping <[email protected]>
    
    * fix
    
    Signed-off-by: 400Ping <[email protected]>
    
    * fix pre-commit
    
    Signed-off-by: 400Ping <[email protected]>
    
    * [QDP] improve memory management
    
    * fix pre-commit
    
    Signed-off-by: 400Ping <[email protected]>
    
    * update doc
    
    Signed-off-by: 400Ping <[email protected]>
    
    * update doc
    
    Signed-off-by: 400Ping <[email protected]>
    
    * improve memory usage
    
    * improve pool
    
    * improve for memory
    
    * fix back
    
    * merge and improve
    
    * fix error
    
    * memory release
    
    * follow suggestions
    
    * Address reviewer feedback: safety checks, error handling, and code cleanup
    
    * Revert e2e Benchmark
    
    * refactor ffi
    
    * remove redundant check
    
    * fix error
    
    ---------
    
    Signed-off-by: 400Ping <[email protected]>
    Co-authored-by: 400Ping <[email protected]>
---
 qdp/qdp-core/src/gpu/cuda_ffi.rs            |  52 ++++++
 qdp/qdp-core/src/gpu/encodings/amplitude.rs |  15 +-
 qdp/qdp-core/src/gpu/memory.rs              |  87 +++++++++-
 qdp/qdp-core/src/gpu/mod.rs                 |   6 +
 qdp/qdp-core/src/gpu/pipeline.rs            | 102 ++++++++++--
 qdp/qdp-core/src/io.rs                      | 241 ++++++++++++++++++++++++++++
 qdp/qdp-core/src/lib.rs                     | 205 +++++++++++++++++++++--
 qdp/qdp-core/tests/arrow_ipc_io.rs          |   4 +-
 qdp/qdp-core/tests/common/mod.rs            |   3 +-
 9 files changed, 666 insertions(+), 49 deletions(-)

diff --git a/qdp/qdp-core/src/gpu/cuda_ffi.rs b/qdp/qdp-core/src/gpu/cuda_ffi.rs
new file mode 100644
index 000000000..b61b4e4b2
--- /dev/null
+++ b/qdp/qdp-core/src/gpu/cuda_ffi.rs
@@ -0,0 +1,52 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//! Centralized CUDA Runtime API FFI declarations.
+
+#![cfg(target_os = "linux")]
+
+use std::ffi::c_void;
+
+pub(crate) const CUDA_MEMCPY_HOST_TO_DEVICE: u32 = 1;
+pub(crate) const CUDA_EVENT_DISABLE_TIMING: u32 = 0x02;
+
+unsafe extern "C" {
+    pub(crate) fn cudaHostAlloc(pHost: *mut *mut c_void, size: usize, flags: 
u32) -> i32;
+    pub(crate) fn cudaFreeHost(ptr: *mut c_void) -> i32;
+
+    pub(crate) fn cudaMemGetInfo(free: *mut usize, total: *mut usize) -> i32;
+
+    pub(crate) fn cudaMemcpyAsync(
+        dst: *mut c_void,
+        src: *const c_void,
+        count: usize,
+        kind: u32,
+        stream: *mut c_void,
+    ) -> i32;
+
+    pub(crate) fn cudaEventCreateWithFlags(event: *mut *mut c_void, flags: 
u32) -> i32;
+    pub(crate) fn cudaEventRecord(event: *mut c_void, stream: *mut c_void) -> 
i32;
+    pub(crate) fn cudaEventDestroy(event: *mut c_void) -> i32;
+    pub(crate) fn cudaStreamWaitEvent(stream: *mut c_void, event: *mut c_void, 
flags: u32) -> i32;
+    pub(crate) fn cudaStreamSynchronize(stream: *mut c_void) -> i32;
+
+    pub(crate) fn cudaMemsetAsync(
+        devPtr: *mut c_void,
+        value: i32,
+        count: usize,
+        stream: *mut c_void,
+    ) -> i32;
+}
diff --git a/qdp/qdp-core/src/gpu/encodings/amplitude.rs 
b/qdp/qdp-core/src/gpu/encodings/amplitude.rs
index 715f0984e..70b38895e 100644
--- a/qdp/qdp-core/src/gpu/encodings/amplitude.rs
+++ b/qdp/qdp-core/src/gpu/encodings/amplitude.rs
@@ -14,7 +14,7 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-// Amplitude encoding: direct state injection with L2 normalization
+// Amplitude encoding: state injection with L2 normalization
 
 use std::sync::Arc;
 
@@ -27,6 +27,8 @@ use super::QuantumEncoder;
 #[cfg(target_os = "linux")]
 use std::ffi::c_void;
 #[cfg(target_os = "linux")]
+use crate::gpu::cuda_ffi::cudaMemsetAsync;
+#[cfg(target_os = "linux")]
 use cudarc::driver::{DevicePtr, DevicePtrMut};
 #[cfg(target_os = "linux")]
 use qdp_kernels::{
@@ -279,7 +281,7 @@ impl QuantumEncoder for AmplitudeEncoder {
 impl AmplitudeEncoder {
 
 
-    /// Async pipeline encoding for large data (SSS-tier optimization)
+    /// Async pipeline encoding for large data
     ///
     /// Uses the generic dual-stream pipeline infrastructure to overlap
     /// data transfer and computation. The pipeline handles all the
@@ -359,15 +361,6 @@ impl AmplitudeEncoder {
             // Zero-fill padding region using CUDA Runtime API
             // Use default stream since pipeline streams are already 
synchronized
             unsafe {
-                unsafe extern "C" {
-                    fn cudaMemsetAsync(
-                        devPtr: *mut c_void,
-                        value: i32,
-                        count: usize,
-                        stream: *mut c_void,
-                    ) -> i32;
-                }
-
                 let result = cudaMemsetAsync(
                     tail_ptr,
                     0,
diff --git a/qdp/qdp-core/src/gpu/memory.rs b/qdp/qdp-core/src/gpu/memory.rs
index 72bf5bc7f..26e7b1383 100644
--- a/qdp/qdp-core/src/gpu/memory.rs
+++ b/qdp/qdp-core/src/gpu/memory.rs
@@ -26,6 +26,9 @@ pub enum Precision {
     Float64,
 }
 
+#[cfg(target_os = "linux")]
+use crate::gpu::cuda_ffi::{cudaFreeHost, cudaHostAlloc, cudaMemGetInfo};
+
 #[cfg(target_os = "linux")]
 fn bytes_to_mib(bytes: usize) -> f64 {
     bytes as f64 / (1024.0 * 1024.0)
@@ -45,10 +48,6 @@ fn cuda_error_to_string(code: i32) -> &'static str {
 #[cfg(target_os = "linux")]
 fn query_cuda_mem_info() -> Result<(usize, usize)> {
     unsafe {
-        unsafe extern "C" {
-            fn cudaMemGetInfo(free: *mut usize, total: *mut usize) -> i32;
-        }
-
         let mut free_bytes: usize = 0;
         let mut total_bytes: usize = 0;
         let result = cudaMemGetInfo(&mut free_bytes as *mut usize, &mut 
total_bytes as *mut usize);
@@ -379,3 +378,83 @@ impl GpuStateVector {
         }
     }
 }
+
+// === Pinned Memory Implementation ===
+
+/// Pinned Host Memory Buffer (Page-Locked)
+///
+/// Enables DMA for H2D copies, doubling bandwidth and reducing CPU usage.
+#[cfg(target_os = "linux")]
+pub struct PinnedBuffer {
+    ptr: *mut f64,
+    size_elements: usize,
+}
+
+#[cfg(target_os = "linux")]
+impl PinnedBuffer {
+    /// Allocate pinned memory
+    pub fn new(elements: usize) -> Result<Self> {
+        unsafe {
+            let bytes = elements
+                .checked_mul(std::mem::size_of::<f64>())
+                .ok_or_else(|| MahoutError::MemoryAllocation(
+                    format!("Requested pinned buffer allocation size overflow 
(elements={})", elements)
+                ))?;
+            let mut ptr: *mut c_void = std::ptr::null_mut();
+
+            let ret = cudaHostAlloc(&mut ptr, bytes, 0); // 
cudaHostAllocDefault
+
+            if ret != 0 {
+                return Err(MahoutError::MemoryAllocation(
+                    format!("cudaHostAlloc failed with error code: {}", ret)
+                ));
+            }
+
+            Ok(Self {
+                ptr: ptr as *mut f64,
+                size_elements: elements,
+            })
+        }
+    }
+
+    /// Get mutable slice to write data into
+    pub fn as_slice_mut(&mut self) -> &mut [f64] {
+        unsafe { std::slice::from_raw_parts_mut(self.ptr, self.size_elements) }
+    }
+
+    /// Get raw pointer for CUDA memcpy
+    pub fn ptr(&self) -> *const f64 {
+        self.ptr
+    }
+
+    pub fn len(&self) -> usize {
+        self.size_elements
+    }
+
+    pub fn is_empty(&self) -> bool {
+        self.size_elements == 0
+    }
+}
+
+#[cfg(target_os = "linux")]
+impl Drop for PinnedBuffer {
+    fn drop(&mut self) {
+        unsafe {
+            let result = cudaFreeHost(self.ptr as *mut c_void);
+            if result != 0 {
+                eprintln!(
+                    "Warning: cudaFreeHost failed with error code {} ({})",
+                    result,
+                    cuda_error_to_string(result)
+                );
+            }
+        }
+    }
+}
+
+// Safety: Pinned memory is accessible from any thread
+#[cfg(target_os = "linux")]
+unsafe impl Send for PinnedBuffer {}
+
+#[cfg(target_os = "linux")]
+unsafe impl Sync for PinnedBuffer {}
diff --git a/qdp/qdp-core/src/gpu/mod.rs b/qdp/qdp-core/src/gpu/mod.rs
index fe7cdace0..77b41f6e8 100644
--- a/qdp/qdp-core/src/gpu/mod.rs
+++ b/qdp/qdp-core/src/gpu/mod.rs
@@ -18,6 +18,12 @@ pub mod memory;
 pub mod encodings;
 pub mod pipeline;
 
+#[cfg(target_os = "linux")]
+pub(crate) mod cuda_ffi;
+
 pub use memory::GpuStateVector;
 pub use encodings::{QuantumEncoder, AmplitudeEncoder, AngleEncoder, 
BasisEncoder, get_encoder};
 pub use pipeline::run_dual_stream_pipeline;
+
+#[cfg(target_os = "linux")]
+pub use pipeline::PipelineContext;
diff --git a/qdp/qdp-core/src/gpu/pipeline.rs b/qdp/qdp-core/src/gpu/pipeline.rs
index 3c5921c38..bbe74c7b0 100644
--- a/qdp/qdp-core/src/gpu/pipeline.rs
+++ b/qdp/qdp-core/src/gpu/pipeline.rs
@@ -24,7 +24,94 @@ use std::ffi::c_void;
 use cudarc::driver::{CudaDevice, CudaSlice, DevicePtr, safe::CudaStream};
 use crate::error::{MahoutError, Result};
 #[cfg(target_os = "linux")]
-use crate::gpu::memory::{ensure_device_memory_available, map_allocation_error};
+use crate::gpu::memory::{ensure_device_memory_available, map_allocation_error, 
PinnedBuffer};
+
+#[cfg(target_os = "linux")]
+use crate::gpu::cuda_ffi::{
+    cudaEventCreateWithFlags, cudaEventDestroy, cudaEventRecord, 
cudaMemcpyAsync, cudaStreamSynchronize,
+    cudaStreamWaitEvent, CUDA_EVENT_DISABLE_TIMING, CUDA_MEMCPY_HOST_TO_DEVICE,
+};
+
+/// Dual-stream pipeline context: manages compute/copy streams and sync events
+#[cfg(target_os = "linux")]
+pub struct PipelineContext {
+    pub stream_compute: CudaStream,
+    pub stream_copy: CudaStream,
+    event_copy_done: *mut c_void,
+}
+
+#[cfg(target_os = "linux")]
+impl PipelineContext {
+    /// Create dual streams and sync event
+    pub fn new(device: &Arc<CudaDevice>) -> Result<Self> {
+        let stream_compute = device.fork_default_stream()
+            .map_err(|e| MahoutError::Cuda(format!("{:?}", e)))?;
+        let stream_copy = device.fork_default_stream()
+            .map_err(|e| MahoutError::Cuda(format!("{:?}", e)))?;
+
+        let mut event_copy_done: *mut c_void = std::ptr::null_mut();
+        unsafe {
+            let ret = cudaEventCreateWithFlags(&mut event_copy_done, 
CUDA_EVENT_DISABLE_TIMING);
+            if ret != 0 {
+                return Err(MahoutError::Cuda(format!("Failed to create CUDA 
event: {}", ret)));
+            }
+        }
+
+        Ok(Self {
+            stream_compute,
+            stream_copy,
+            event_copy_done,
+        })
+    }
+
+    /// Async H2D copy on copy stream
+    pub unsafe fn async_copy_to_device(&self, src: &PinnedBuffer, dst: *mut 
c_void, len_elements: usize) {
+        crate::profile_scope!("GPU::H2D_Copy");
+        unsafe {
+            cudaMemcpyAsync(
+                dst,
+                src.ptr() as *const c_void,
+                len_elements * std::mem::size_of::<f64>(),
+                CUDA_MEMCPY_HOST_TO_DEVICE,
+                self.stream_copy.stream as *mut c_void
+            );
+        }
+    }
+
+    /// Record copy completion event
+    pub unsafe fn record_copy_done(&self) {
+        unsafe {
+            cudaEventRecord(self.event_copy_done, self.stream_copy.stream as 
*mut c_void);
+        }
+    }
+
+    /// Make compute stream wait for copy completion
+    pub unsafe fn wait_for_copy(&self) {
+        crate::profile_scope!("GPU::StreamWait");
+        unsafe {
+            cudaStreamWaitEvent(self.stream_compute.stream as *mut c_void, 
self.event_copy_done, 0);
+        }
+    }
+
+    /// Sync copy stream (safe to reuse host buffer)
+    pub unsafe fn sync_copy_stream(&self) {
+        crate::profile_scope!("Pipeline::SyncCopy");
+        unsafe {
+            cudaStreamSynchronize(self.stream_copy.stream as *mut c_void);
+        }
+    }
+}
+
+#[cfg(target_os = "linux")]
+impl Drop for PipelineContext {
+    fn drop(&mut self) {
+        unsafe {
+            if !self.event_copy_done.is_null() {
+                cudaEventDestroy(self.event_copy_done);
+            }
+        }
+    }
+}
 
 /// Chunk processing callback for async pipeline
 ///
@@ -112,24 +199,11 @@ where
         {
             crate::profile_scope!("GPU::H2DCopyAsync");
             unsafe {
-                unsafe extern "C" {
-                    fn cudaMemcpyAsync(
-                        dst: *mut c_void,
-                        src: *const c_void,
-                        count: usize,
-                        kind: u32,
-                        stream: *mut c_void,
-                    ) -> i32;
-                }
-
                 let dst_device_ptr = *input_chunk_dev.device_ptr() as *mut 
c_void;
                 let src_host_ptr = chunk.as_ptr() as *const c_void;
                 let bytes = chunk.len() * std::mem::size_of::<f64>();
                 let stream_handle = current_stream.stream as *mut c_void;
 
-                // cudaMemcpyKind: cudaMemcpyHostToDevice = 1
-                const CUDA_MEMCPY_HOST_TO_DEVICE: u32 = 1;
-
                 let result = cudaMemcpyAsync(
                     dst_device_ptr,
                     src_host_ptr,
diff --git a/qdp/qdp-core/src/io.rs b/qdp/qdp-core/src/io.rs
index 372f4ef75..e7b253626 100644
--- a/qdp/qdp-core/src/io.rs
+++ b/qdp/qdp-core/src/io.rs
@@ -438,3 +438,244 @@ pub fn read_arrow_ipc_batch<P: AsRef<Path>>(path: P) -> 
Result<(Vec<f64>, usize,
 
     Ok((all_data, num_samples, sample_size))
 }
+
+/// Streaming Parquet reader for List<Float64> and FixedSizeList<Float64> 
columns
+///
+/// Reads Parquet files in chunks without loading entire file into memory.
+/// Supports efficient streaming for large files via Producer-Consumer pattern.
+pub struct ParquetBlockReader {
+    reader: parquet::arrow::arrow_reader::ParquetRecordBatchReader,
+    sample_size: Option<usize>,
+    leftover_data: Vec<f64>,
+    leftover_cursor: usize,
+    pub total_rows: usize,
+}
+
+impl ParquetBlockReader {
+    /// Create a new streaming Parquet reader
+    ///
+    /// # Arguments
+    /// * `path` - Path to the Parquet file
+    /// * `batch_size` - Optional batch size (defaults to 2048)
+    pub fn new<P: AsRef<Path>>(path: P, batch_size: Option<usize>) -> 
Result<Self> {
+        let file = File::open(path.as_ref()).map_err(|e| {
+            MahoutError::Io(format!("Failed to open Parquet file: {}", e))
+        })?;
+
+        let builder = 
ParquetRecordBatchReaderBuilder::try_new(file).map_err(|e| {
+            MahoutError::Io(format!("Failed to create Parquet reader: {}", e))
+        })?;
+
+        let schema = builder.schema();
+        if schema.fields().len() != 1 {
+            return Err(MahoutError::InvalidInput(format!(
+                "Expected exactly one column, got {}",
+                schema.fields().len()
+            )));
+        }
+
+        let field = &schema.fields()[0];
+        match field.data_type() {
+            DataType::List(child_field) => {
+                if !matches!(child_field.data_type(), DataType::Float64) {
+                    return Err(MahoutError::InvalidInput(format!(
+                        "Expected List<Float64> column, got List<{:?}>",
+                        child_field.data_type()
+                    )));
+                }
+            }
+            DataType::FixedSizeList(child_field, _) => {
+                if !matches!(child_field.data_type(), DataType::Float64) {
+                    return Err(MahoutError::InvalidInput(format!(
+                        "Expected FixedSizeList<Float64> column, got 
FixedSizeList<{:?}>",
+                        child_field.data_type()
+                    )));
+                }
+            }
+            _ => {
+                return Err(MahoutError::InvalidInput(format!(
+                    "Expected List<Float64> or FixedSizeList<Float64> column, 
got {:?}",
+                    field.data_type()
+                )));
+            }
+        }
+
+        let total_rows = builder.metadata().file_metadata().num_rows() as 
usize;
+
+        let batch_size = batch_size.unwrap_or(2048);
+        let reader = builder
+            .with_batch_size(batch_size)
+            .build()
+            .map_err(|e| {
+                MahoutError::Io(format!("Failed to build Parquet reader: {}", 
e))
+            })?;
+
+        Ok(Self {
+            reader,
+            sample_size: None,
+            leftover_data: Vec::new(),
+            leftover_cursor: 0,
+            total_rows,
+        })
+    }
+
+    /// Get the sample size (number of elements per sample)
+    pub fn get_sample_size(&self) -> Option<usize> {
+        self.sample_size
+    }
+
+    /// Read a chunk of data into the provided buffer
+    ///
+    /// Handles leftover data from previous reads and ensures sample 
boundaries are respected.
+    /// Returns the number of elements written to the buffer.
+    pub fn read_chunk(&mut self, buffer: &mut [f64]) -> Result<usize> {
+        let mut written = 0;
+        let buf_cap = buffer.len();
+        let calc_limit = |ss: usize| -> usize {
+            if ss == 0 {
+                buf_cap
+            } else {
+                (buf_cap / ss) * ss
+            }
+        };
+        let mut limit = self.sample_size.map_or(buf_cap, calc_limit);
+
+        if self.sample_size.is_some() {
+            while self.leftover_cursor < self.leftover_data.len() && written < 
limit {
+                let available = self.leftover_data.len() - 
self.leftover_cursor;
+                let space_left = limit - written;
+                let to_copy = std::cmp::min(available, space_left);
+
+                if to_copy > 0 {
+                    buffer[written..written+to_copy].copy_from_slice(
+                        
&self.leftover_data[self.leftover_cursor..self.leftover_cursor+to_copy]
+                    );
+                    written += to_copy;
+                    self.leftover_cursor += to_copy;
+
+                    if self.leftover_cursor == self.leftover_data.len() {
+                        self.leftover_data.clear();
+                        self.leftover_cursor = 0;
+                        break;
+                    }
+                } else {
+                    break;
+                }
+            }
+        }
+
+        while written < limit {
+            match self.reader.next() {
+                Some(Ok(batch)) => {
+                    if batch.num_columns() == 0 {
+                        continue;
+                    }
+                    let column = batch.column(0);
+
+                    let (current_sample_size, batch_values) = match 
column.data_type() {
+                        DataType::List(_) => {
+                            let list_array = column
+                                .as_any()
+                                .downcast_ref::<ListArray>()
+                                .ok_or_else(|| MahoutError::Io("Failed to 
downcast to ListArray".to_string()))?;
+
+                            if list_array.len() == 0 {
+                                continue;
+                            }
+
+                            let mut batch_values = Vec::new();
+                            let mut current_sample_size = None;
+                            for i in 0..list_array.len() {
+                                let value_array = list_array.value(i);
+                                let float_array = value_array
+                                    .as_any()
+                                    .downcast_ref::<Float64Array>()
+                                    .ok_or_else(|| MahoutError::Io("List 
values must be Float64".to_string()))?;
+
+                                if i == 0 {
+                                    current_sample_size = 
Some(float_array.len());
+                                }
+
+                                if float_array.null_count() == 0 {
+                                    
batch_values.extend_from_slice(float_array.values());
+                                } else {
+                                    return Err(MahoutError::Io("Null value 
encountered in Float64Array during quantum encoding. Please check data quality 
at the source.".to_string()));
+                                }
+                            }
+
+                            (current_sample_size.expect("list_array.len() > 0 
ensures at least one element"), batch_values)
+                        }
+                        DataType::FixedSizeList(_, size) => {
+                            let list_array = column
+                                .as_any()
+                                .downcast_ref::<FixedSizeListArray>()
+                                .ok_or_else(|| MahoutError::Io("Failed to 
downcast to FixedSizeListArray".to_string()))?;
+
+                            if list_array.len() == 0 {
+                                continue;
+                            }
+
+                            let current_sample_size = *size as usize;
+
+                            let values = list_array.values();
+                            let float_array = values
+                                .as_any()
+                                .downcast_ref::<Float64Array>()
+                                .ok_or_else(|| MahoutError::Io("FixedSizeList 
values must be Float64".to_string()))?;
+
+                            let mut batch_values = Vec::new();
+                            if float_array.null_count() == 0 {
+                                
batch_values.extend_from_slice(float_array.values());
+                            } else {
+                                return Err(MahoutError::Io("Null value 
encountered in Float64Array during quantum encoding. Please check data quality 
at the source.".to_string()));
+                            }
+
+                            (current_sample_size, batch_values)
+                        }
+                        _ => {
+                            return Err(MahoutError::Io(format!(
+                                "Expected List<Float64> or 
FixedSizeList<Float64>, got {:?}",
+                                column.data_type()
+                            )));
+                        }
+                    };
+
+                    if self.sample_size.is_none() {
+                        self.sample_size = Some(current_sample_size);
+                        limit = calc_limit(current_sample_size);
+                    } else {
+                        if let Some(expected_size) = self.sample_size {
+                            if current_sample_size != expected_size {
+                                return Err(MahoutError::InvalidInput(format!(
+                                    "Inconsistent sample sizes: expected {}, 
got {}",
+                                    expected_size, current_sample_size
+                                )));
+                            }
+                        }
+                    }
+
+                    let available = batch_values.len();
+                    let space_left = limit - written;
+
+                    if available <= space_left {
+                        
buffer[written..written+available].copy_from_slice(&batch_values);
+                        written += available;
+                    } else {
+                        if space_left > 0 {
+                            
buffer[written..written+space_left].copy_from_slice(&batch_values[0..space_left]);
+                            written += space_left;
+                        }
+                        self.leftover_data.clear();
+                        
self.leftover_data.extend_from_slice(&batch_values[space_left..]);
+                        self.leftover_cursor = 0;
+                        break;
+                    }
+                },
+                Some(Err(e)) => return Err(MahoutError::Io(format!("Parquet 
read error: {}", e))),
+                None => break,
+            }
+        }
+
+        Ok(written)
+    }
+}
diff --git a/qdp/qdp-core/src/lib.rs b/qdp/qdp-core/src/lib.rs
index e14e2bab0..429813c26 100644
--- a/qdp/qdp-core/src/lib.rs
+++ b/qdp/qdp-core/src/lib.rs
@@ -26,10 +26,28 @@ pub use error::{MahoutError, Result};
 pub use gpu::memory::Precision;
 
 use std::sync::Arc;
+#[cfg(target_os = "linux")]
+use std::ffi::c_void;
+#[cfg(target_os = "linux")]
+use std::sync::mpsc::{sync_channel, Receiver, SyncSender};
+#[cfg(target_os = "linux")]
+use std::thread;
 
-use cudarc::driver::CudaDevice;
+use cudarc::driver::{CudaDevice, DevicePtr, DevicePtrMut};
 use crate::dlpack::DLManagedTensor;
 use crate::gpu::get_encoder;
+#[cfg(target_os = "linux")]
+use crate::gpu::memory::{PinnedBuffer, GpuStateVector};
+#[cfg(target_os = "linux")]
+use crate::gpu::PipelineContext;
+#[cfg(target_os = "linux")]
+use qdp_kernels::{launch_l2_norm_batch, launch_amplitude_encode_batch};
+
+/// 512MB staging buffer for large Parquet row groups (reduces fragmentation)
+#[cfg(target_os = "linux")]
+const STAGE_SIZE_BYTES: usize = 512 * 1024 * 1024;
+#[cfg(target_os = "linux")]
+const STAGE_SIZE_ELEMENTS: usize = STAGE_SIZE_BYTES / 
std::mem::size_of::<f64>();
 
 /// Main entry point for Mahout QDP
 ///
@@ -134,18 +152,18 @@ impl QdpEngine {
         Ok(dlpack_ptr)
     }
 
-    /// Load data from Parquet file and encode into quantum state
+    /// Streaming Parquet encoder with multi-threaded IO
     ///
-    /// Reads Parquet file with List<Float64> column format and encodes all 
samples
-    /// in a single batch operation. Bypasses pandas for maximum performance.
+    /// Uses Producer-Consumer pattern: IO thread reads Parquet while GPU 
processes data.
+    /// Double-buffered (ping-pong) for maximum pipeline overlap.
     ///
     /// # Arguments
-    /// * `path` - Path to Parquet file
+    /// * `path` - Path to Parquet file with List<Float64> column
     /// * `num_qubits` - Number of qubits
-    /// * `encoding_method` - Strategy: "amplitude", "angle", or "basis"
+    /// * `encoding_method` - Currently only "amplitude" supported for 
streaming
     ///
     /// # Returns
-    /// Single DLPack pointer containing all encoded states (shape: 
[num_samples, 2^num_qubits])
+    /// DLPack pointer to encoded states [num_samples, 2^num_qubits]
     pub fn encode_from_parquet(
         &self,
         path: &str,
@@ -154,14 +172,171 @@ impl QdpEngine {
     ) -> Result<*mut DLManagedTensor> {
         crate::profile_scope!("Mahout::EncodeFromParquet");
 
-        // Read Parquet directly using Arrow (faster than pandas)
-        let (batch_data, num_samples, sample_size) = {
-            crate::profile_scope!("IO::ReadParquetBatch");
-            crate::io::read_parquet_batch(path)?
-        };
+        #[cfg(target_os = "linux")]
+        {
+            if encoding_method != "amplitude" {
+                return Err(MahoutError::NotImplemented("Only amplitude 
encoding supported for streaming".into()));
+            }
 
-        // Encode using fused batch kernel
-        self.encode_batch(&batch_data, num_samples, sample_size, num_qubits, 
encoding_method)
+            let mut reader_core = crate::io::ParquetBlockReader::new(path, 
None)?;
+            let num_samples = reader_core.total_rows;
+
+            let total_state_vector = GpuStateVector::new_batch(&self.device, 
num_samples, num_qubits)?;
+            let ctx = PipelineContext::new(&self.device)?;
+
+            let dev_in_a = unsafe { 
self.device.alloc::<f64>(STAGE_SIZE_ELEMENTS) }
+                .map_err(|e| MahoutError::MemoryAllocation(format!("{:?}", 
e)))?;
+            let dev_in_b = unsafe { 
self.device.alloc::<f64>(STAGE_SIZE_ELEMENTS) }
+                .map_err(|e| MahoutError::MemoryAllocation(format!("{:?}", 
e)))?;
+
+            let (full_buf_tx, full_buf_rx): 
(SyncSender<std::result::Result<(PinnedBuffer, usize), MahoutError>>, 
Receiver<std::result::Result<(PinnedBuffer, usize), MahoutError>>) = 
sync_channel(2);
+            let (empty_buf_tx, empty_buf_rx): (SyncSender<PinnedBuffer>, 
Receiver<PinnedBuffer>) = sync_channel(2);
+
+            let mut host_buf_first = PinnedBuffer::new(STAGE_SIZE_ELEMENTS)?;
+            let first_len = 
reader_core.read_chunk(host_buf_first.as_slice_mut())?;
+
+            let sample_size = reader_core.get_sample_size()
+                .ok_or_else(|| MahoutError::InvalidInput("Could not determine 
sample size".into()))?;
+
+            if sample_size == 0 {
+                return Err(MahoutError::InvalidInput("Sample size cannot be 
zero".into()));
+            }
+
+            full_buf_tx.send(Ok((host_buf_first, first_len)))
+                .map_err(|_| MahoutError::Io("Failed to send first 
buffer".into()))?;
+
+            empty_buf_tx.send(PinnedBuffer::new(STAGE_SIZE_ELEMENTS)?)
+                .map_err(|_| MahoutError::Io("Failed to send second 
buffer".into()))?;
+
+            let mut reader = reader_core;
+            let io_handle = thread::spawn(move || {
+                loop {
+                    let mut buffer = match empty_buf_rx.recv() {
+                        Ok(b) => b,
+                        Err(_) => break,
+                    };
+
+                    let result = 
reader.read_chunk(buffer.as_slice_mut()).map(|len| (buffer, len));
+
+                    let should_break = match &result {
+                        Ok((_, len)) => *len == 0,
+                        Err(_) => true,
+                    };
+
+                    if full_buf_tx.send(result).is_err() { break; }
+
+                    if should_break { break; }
+                }
+            });
+
+            let mut global_sample_offset: usize = 0;
+            let mut use_dev_a = true;
+            let state_len_per_sample = 1 << num_qubits;
+
+            loop {
+                let (host_buffer, current_len) = match full_buf_rx.recv() {
+                    Ok(Ok((buffer, len))) => (buffer, len),
+                    Ok(Err(e)) => return Err(e),
+                    Err(_) => return Err(MahoutError::Io("IO thread 
disconnected".into())),
+                };
+
+                if current_len == 0 { break; }
+
+                if current_len % sample_size != 0 {
+                    return Err(MahoutError::InvalidInput(format!(
+                        "Chunk length {} is not a multiple of sample size {}",
+                        current_len, sample_size
+                    )));
+                }
+
+                let samples_in_chunk = current_len / sample_size;
+                if samples_in_chunk > 0 {
+                    let dev_ptr = if use_dev_a { *dev_in_a.device_ptr() } else 
{ *dev_in_b.device_ptr() };
+
+                    unsafe {
+                        crate::profile_scope!("GPU::Dispatch");
+
+                        ctx.async_copy_to_device(&host_buffer, dev_ptr as *mut 
c_void, current_len);
+                        ctx.record_copy_done();
+                        ctx.wait_for_copy();
+
+                        {
+                            crate::profile_scope!("GPU::BatchEncode");
+                            let offset_elements = global_sample_offset
+                                .checked_mul(state_len_per_sample)
+                                .ok_or_else(|| MahoutError::MemoryAllocation(
+                                    format!("Offset calculation overflow: {} * 
{}", global_sample_offset, state_len_per_sample)
+                                ))?;
+
+                            let offset_bytes = offset_elements
+                                
.checked_mul(std::mem::size_of::<qdp_kernels::CuDoubleComplex>())
+                                .ok_or_else(|| MahoutError::MemoryAllocation(
+                                    format!("Offset bytes calculation 
overflow: {} * {}", offset_elements, 
std::mem::size_of::<qdp_kernels::CuDoubleComplex>())
+                                ))?;
+
+                            let state_ptr_offset = 
total_state_vector.ptr_void().cast::<u8>()
+                                .add(offset_bytes)
+                                .cast::<std::ffi::c_void>();
+
+                            let mut norm_buffer = 
self.device.alloc_zeros::<f64>(samples_in_chunk)
+                                .map_err(|e| 
MahoutError::MemoryAllocation(format!("Failed to allocate norm buffer: {:?}", 
e)))?;
+
+                            {
+                                crate::profile_scope!("GPU::NormBatch");
+                                let ret = launch_l2_norm_batch(
+                                    dev_ptr as *const f64,
+                                    samples_in_chunk,
+                                    sample_size,
+                                    *norm_buffer.device_ptr_mut() as *mut f64,
+                                    ctx.stream_compute.stream as *mut c_void
+                                );
+                                if ret != 0 {
+                                    return 
Err(MahoutError::KernelLaunch(format!("Norm kernel error: {}", ret)));
+                                }
+                            }
+
+                            {
+                                crate::profile_scope!("GPU::EncodeBatch");
+                                let ret = launch_amplitude_encode_batch(
+                                    dev_ptr as *const f64,
+                                    state_ptr_offset,
+                                    *norm_buffer.device_ptr() as *const f64,
+                                    samples_in_chunk,
+                                    sample_size,
+                                    state_len_per_sample,
+                                    ctx.stream_compute.stream as *mut c_void
+                                );
+                                if ret != 0 {
+                                    return 
Err(MahoutError::KernelLaunch(format!("Encode kernel error: {}", ret)));
+                                }
+                            }
+                        }
+
+                        ctx.sync_copy_stream();
+                    }
+                    global_sample_offset = global_sample_offset
+                        .checked_add(samples_in_chunk)
+                        .ok_or_else(|| MahoutError::MemoryAllocation(
+                            format!("Sample offset overflow: {} + {}", 
global_sample_offset, samples_in_chunk)
+                        ))?;
+                    use_dev_a = !use_dev_a;
+                }
+
+                let _ = empty_buf_tx.send(host_buffer);
+            }
+
+            self.device.synchronize().map_err(|e| 
MahoutError::Cuda(format!("{:?}", e)))?;
+            io_handle.join().map_err(|e| MahoutError::Io(format!("IO thread 
panicked: {:?}", e)))?;
+
+            let dlpack_ptr = total_state_vector.to_dlpack();
+            Ok(dlpack_ptr)
+        }
+
+        #[cfg(not(target_os = "linux"))]
+        {
+            let (batch_data, num_samples, sample_size) = 
crate::io::read_parquet_batch(path)?;
+            self.encode_batch(&batch_data, num_samples, sample_size, 
num_qubits, encoding_method)
+        }
     }
 
     /// Load data from Arrow IPC file and encode into quantum state
@@ -185,13 +360,11 @@ impl QdpEngine {
     ) -> Result<*mut DLManagedTensor> {
         crate::profile_scope!("Mahout::EncodeFromArrowIPC");
 
-        // Read Arrow IPC (6x faster than Parquet)
         let (batch_data, num_samples, sample_size) = {
             crate::profile_scope!("IO::ReadArrowIPCBatch");
             crate::io::read_arrow_ipc_batch(path)?
         };
 
-        // Encode using fused batch kernel
         self.encode_batch(&batch_data, num_samples, sample_size, num_qubits, 
encoding_method)
     }
 }
diff --git a/qdp/qdp-core/tests/arrow_ipc_io.rs 
b/qdp/qdp-core/tests/arrow_ipc_io.rs
index 6ef206954..1a9289c08 100644
--- a/qdp/qdp-core/tests/arrow_ipc_io.rs
+++ b/qdp/qdp-core/tests/arrow_ipc_io.rs
@@ -14,15 +14,13 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-use qdp_core::io::{read_arrow_ipc_batch, read_parquet_batch};
+use qdp_core::io::read_arrow_ipc_batch;
 use arrow::array::{Float64Array, FixedSizeListArray};
 use arrow::datatypes::{DataType, Field, Schema};
 use arrow::ipc::writer::FileWriter as ArrowFileWriter;
 use std::fs::{self, File};
 use std::sync::Arc;
 
-mod common;
-
 #[test]
 fn test_read_arrow_ipc_fixed_size_list() {
     let temp_path = "/tmp/test_arrow_ipc_fixed.arrow";
diff --git a/qdp/qdp-core/tests/common/mod.rs b/qdp/qdp-core/tests/common/mod.rs
index f105a5436..9afb31e40 100644
--- a/qdp/qdp-core/tests/common/mod.rs
+++ b/qdp/qdp-core/tests/common/mod.rs
@@ -14,7 +14,8 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-/// Create test data with normalized values
+/// Creates normalized test data
+#[allow(dead_code)] // Used by multiple test modules
 pub fn create_test_data(size: usize) -> Vec<f64> {
     (0..size).map(|i| (i as f64) / (size as f64)).collect()
 }


Reply via email to