This is an automated email from the ASF dual-hosted git repository.

guanmingchiu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/mahout.git

commit 3175ef9311b27013c34db14d70e8d6027f254b40
Author: Guan-Ming (Wesley) Chiu <[email protected]>
AuthorDate: Tue Dec 9 19:04:59 2025 +0800

    [QDP] add batch kernel support (#700)
    
    * [QDP] Add batch encoding support
    
    * Refactor batch pre-processing
---
 qdp/benchmark/benchmark_e2e_final.py        | 158 +++++++++++++++++------
 qdp/qdp-core/src/error.rs                   |   3 +
 qdp/qdp-core/src/gpu/encodings/amplitude.rs | 191 +++++++++++-----------------
 qdp/qdp-core/src/gpu/encodings/mod.rs       |  22 ++--
 qdp/qdp-core/src/gpu/memory.rs              |  42 ++++++
 qdp/qdp-core/src/io.rs                      |  97 +++++++++++++-
 qdp/qdp-core/src/lib.rs                     |  53 +++++---
 qdp/qdp-core/src/preprocessing.rs           |  55 ++++++++
 qdp/qdp-kernels/src/amplitude.cu            | 121 ++++++++++++++++++
 qdp/qdp-kernels/src/lib.rs                  |  15 +++
 qdp/qdp-python/src/lib.rs                   |  19 +--
 11 files changed, 583 insertions(+), 193 deletions(-)

diff --git a/qdp/benchmark/benchmark_e2e_final.py 
b/qdp/benchmark/benchmark_e2e_final.py
index fce668a2c..27efab93d 100644
--- a/qdp/benchmark/benchmark_e2e_final.py
+++ b/qdp/benchmark/benchmark_e2e_final.py
@@ -34,6 +34,7 @@ import torch
 import torch.nn as nn
 import numpy as np
 import os
+import itertools
 import pyarrow as pa
 import pyarrow.parquet as pq
 from mahout_qdp import QdpEngine
@@ -73,10 +74,6 @@ def generate_data(n_qubits, n_samples):
     if os.path.exists(DATA_FILE):
         os.remove(DATA_FILE)
 
-    MAHOUT_DATA_FILE = DATA_FILE.replace(".parquet", "_mahout.parquet")
-    if os.path.exists(MAHOUT_DATA_FILE):
-        os.remove(MAHOUT_DATA_FILE)
-
     print(f"Generating {n_samples} samples of {n_qubits} qubits...")
     dim = 1 << n_qubits
 
@@ -93,20 +90,9 @@ def generate_data(n_qubits, n_samples):
             batch_table = pa.Table.from_arrays([arrays], 
names=["feature_vector"])
             writer.write_table(batch_table)
 
-    # Generate for Mahout (flat Float64 format, one sample per batch)
-    schema_flat = pa.schema([("data", pa.float64())])
-    with pq.ParquetWriter(MAHOUT_DATA_FILE, schema_flat) as writer:
-        for i in range(n_samples):
-            sample_data = np.random.rand(dim).astype(np.float64)
-            array = pa.array(sample_data, type=pa.float64())
-            batch_table = pa.Table.from_arrays([array], names=["data"])
-            writer.write_table(batch_table)
-
     file_size_mb = os.path.getsize(DATA_FILE) / (1024 * 1024)
-    mahout_size_mb = os.path.getsize(MAHOUT_DATA_FILE) / (1024 * 1024)
     print(f"  Generated {n_samples} samples")
-    print(f"  PennyLane/Qiskit format: {file_size_mb:.2f} MB")
-    print(f"  Mahout format: {mahout_size_mb:.2f} MB")
+    print(f"  Parquet file size: {file_size_mb:.2f} MB")
 
 
 # -----------------------------------------------------------
@@ -115,7 +101,7 @@ def generate_data(n_qubits, n_samples):
 def run_qiskit(n_qubits, n_samples):
     if not HAS_QISKIT:
         print("\n[Qiskit] Not installed, skipping.")
-        return 0.0
+        return 0.0, None
 
     print("\n[Qiskit] Full Pipeline (Disk -> GPU)...")
     model = DummyQNN(n_qubits).cuda()
@@ -132,6 +118,8 @@ def run_qiskit(n_qubits, n_samples):
     io_time = time.perf_counter() - start_time
     print(f"  IO Time: {io_time:.4f} s")
 
+    all_qiskit_states = []
+
     # Process batches
     for i in range(0, n_samples, BATCH_SIZE):
         batch = raw_data[i : i + BATCH_SIZE]
@@ -158,12 +146,15 @@ def run_qiskit(n_qubits, n_samples):
         gpu_tensor = torch.tensor(
             np.array(batch_states), device="cuda", dtype=torch.complex64
         )
+        all_qiskit_states.append(gpu_tensor)
         _ = model(gpu_tensor.abs())
 
     torch.cuda.synchronize()
     total_time = time.perf_counter() - start_time
     print(f"\n  Total Time: {total_time:.4f} s")
-    return total_time
+
+    all_qiskit_tensor = torch.cat(all_qiskit_states, dim=0)
+    return total_time, all_qiskit_tensor
 
 
 # -----------------------------------------------------------
@@ -172,7 +163,7 @@ def run_qiskit(n_qubits, n_samples):
 def run_pennylane(n_qubits, n_samples):
     if not HAS_PENNYLANE:
         print("\n[PennyLane] Not installed, skipping.")
-        return 0.0
+        return 0.0, None
 
     print("\n[PennyLane] Full Pipeline (Disk -> GPU)...")
 
@@ -198,6 +189,8 @@ def run_pennylane(n_qubits, n_samples):
     io_time = time.perf_counter() - start_time
     print(f"  IO Time: {io_time:.4f} s")
 
+    all_pl_states = []
+
     # Process batches
     for i in range(0, n_samples, BATCH_SIZE):
         batch_cpu = torch.tensor(raw_data[i : i + BATCH_SIZE])
@@ -208,6 +201,8 @@ def run_pennylane(n_qubits, n_samples):
         except Exception:
             state_cpu = torch.stack([circuit(x) for x in batch_cpu])
 
+        all_pl_states.append(state_cpu)
+
         # Transfer to GPU
         state_gpu = state_cpu.to("cuda", dtype=torch.float32)
         _ = model(state_gpu.abs())
@@ -215,7 +210,13 @@ def run_pennylane(n_qubits, n_samples):
     torch.cuda.synchronize()
     total_time = time.perf_counter() - start_time
     print(f"  Total Time: {total_time:.4f} s")
-    return total_time
+
+    # Stack all collected states
+    all_pl_states_tensor = torch.cat(
+        all_pl_states, dim=0
+    )  # Should handle cases where last batch is smaller
+
+    return total_time, all_pl_states_tensor
 
 
 # -----------------------------------------------------------
@@ -224,28 +225,31 @@ def run_pennylane(n_qubits, n_samples):
 def run_mahout(engine, n_qubits, n_samples):
     print("\n[Mahout] Full Pipeline (Disk -> GPU)...")
     model = DummyQNN(n_qubits).cuda()
-    MAHOUT_DATA_FILE = DATA_FILE.replace(".parquet", "_mahout.parquet")
 
     torch.cuda.synchronize()
     start_time = time.perf_counter()
 
-    # Read Parquet and encode all samples
-    import pyarrow.parquet as pq
-
-    parquet_file = pq.ParquetFile(MAHOUT_DATA_FILE)
+    # Direct Parquet to GPU pipeline
+    parquet_encode_start = time.perf_counter()
+    batched_tensor = engine.encode_from_parquet(DATA_FILE, n_qubits, 
"amplitude")
+    parquet_encode_time = time.perf_counter() - parquet_encode_start
+    print(f"  Parquet->GPU (IO+Encode): {parquet_encode_time:.4f} s")
 
-    all_states = []
-    for batch in parquet_file.iter_batches():
-        sample_data = batch.column(0).to_numpy()
-        qtensor = engine.encode(sample_data.tolist(), n_qubits, "amplitude")
-        gpu_state = torch.from_dlpack(qtensor)
-        all_states.append(gpu_state)
+    # Convert to torch tensor (single DLPack call)
+    dlpack_start = time.perf_counter()
+    gpu_batched = torch.from_dlpack(batched_tensor)
+    dlpack_time = time.perf_counter() - dlpack_start
+    print(f"  DLPack conversion: {dlpack_time:.4f} s")
 
-    # Stack and convert
-    gpu_all_data = torch.stack(all_states).abs().to(torch.float32)
+    # Reshape to [n_samples, state_len] (still complex)
+    state_len = 1 << n_qubits
+    gpu_reshaped = gpu_batched.view(n_samples, state_len)
 
-    encode_time = time.perf_counter() - start_time
-    print(f"  IO + Encode Time: {encode_time:.4f} s")
+    # Convert to float for model (batch already on GPU)
+    reshape_start = time.perf_counter()
+    gpu_all_data = gpu_reshaped.abs().to(torch.float32)
+    reshape_time = time.perf_counter() - reshape_start
+    print(f"  Reshape & convert: {reshape_time:.4f} s")
 
     # Forward pass (data already on GPU)
     for i in range(0, n_samples, BATCH_SIZE):
@@ -255,7 +259,46 @@ def run_mahout(engine, n_qubits, n_samples):
     torch.cuda.synchronize()
     total_time = time.perf_counter() - start_time
     print(f"  Total Time: {total_time:.4f} s")
-    return total_time
+    return total_time, gpu_reshaped
+
+
+def compare_states(name_a, states_a, name_b, states_b):
+    print("\n" + "=" * 70)
+    print(f"VERIFICATION ({name_a} vs {name_b})")
+    print("=" * 70)
+
+    # Ensure both tensors are on GPU for comparison
+    n_compare = min(len(states_a), len(states_b))
+    tensor_a = states_a[:n_compare].cuda()
+    tensor_b = states_b[:n_compare].cuda()
+
+    # Compare Probabilities (|psi|^2)
+    diff_probs = (tensor_a.abs() ** 2 - tensor_b.abs() ** 2).abs().max().item()
+    print(f"Max Probability Difference: {diff_probs:.2e}")
+
+    # Compare Raw Amplitudes
+    # We compare full complex difference magnitude
+    diff_amps = (tensor_a - tensor_b).abs().max().item()
+    print(f"Max Amplitude Difference:   {diff_amps:.2e}")
+
+    if diff_probs < 1e-5:
+        print(">> SUCCESS: Quantum States Match!")
+    else:
+        print(">> FAILURE: States do not match.")
+
+
+def verify_correctness(states_dict):
+    # Filter out None values
+    valid_states = {
+        name: states for name, states in states_dict.items() if states is not 
None
+    }
+
+    if len(valid_states) < 2:
+        return
+
+    keys = sorted(list(valid_states.keys()))
+    for name_a, name_b in itertools.combinations(keys, 2):
+        compare_states(name_a, valid_states[name_a], name_b, 
valid_states[name_b])
 
 
 if __name__ == "__main__":
@@ -268,8 +311,26 @@ if __name__ == "__main__":
     parser.add_argument(
         "--samples", type=int, default=200, help="Number of training samples"
     )
+    parser.add_argument(
+        "--frameworks",
+        nargs="+",
+        default=["mahout", "pennylane"],
+        choices=["mahout", "pennylane", "qiskit", "all"],
+        help="Frameworks to benchmark (default: mahout pennylane). Use 'all' 
to run all available frameworks.",
+    )
     args = parser.parse_args()
 
+    # Expand "all" option
+    if "all" in args.frameworks:
+        all_frameworks = []
+        if "mahout" in args.frameworks or "all" in args.frameworks:
+            all_frameworks.append("mahout")
+        if "pennylane" in args.frameworks or "all" in args.frameworks:
+            all_frameworks.append("pennylane")
+        if "qiskit" in args.frameworks or "all" in args.frameworks:
+            all_frameworks.append("qiskit")
+        args.frameworks = all_frameworks
+
     generate_data(args.qubits, args.samples)
 
     try:
@@ -282,10 +343,20 @@ if __name__ == "__main__":
     print(f"E2E BENCHMARK: {args.qubits} Qubits, {args.samples} Samples")
     print("=" * 70)
 
+    # Initialize results
+    t_pl, pl_all_states = 0.0, None
+    t_mahout, mahout_all_states = 0.0, None
+    t_qiskit, qiskit_all_states = 0.0, None
+
     # Run benchmarks
-    t_pl = run_pennylane(args.qubits, args.samples)
-    t_qiskit = run_qiskit(args.qubits, args.samples)
-    t_mahout = run_mahout(engine, args.qubits, args.samples)
+    if "pennylane" in args.frameworks:
+        t_pl, pl_all_states = run_pennylane(args.qubits, args.samples)
+
+    if "qiskit" in args.frameworks:
+        t_qiskit, qiskit_all_states = run_qiskit(args.qubits, args.samples)
+
+    if "mahout" in args.frameworks:
+        t_mahout, mahout_all_states = run_mahout(engine, args.qubits, 
args.samples)
 
     print("\n" + "=" * 70)
     print("E2E LATENCY (Lower is Better)")
@@ -311,3 +382,12 @@ if __name__ == "__main__":
             print(f"Speedup vs PennyLane: {t_pl / t_mahout:10.2f}x")
         if t_qiskit > 0:
             print(f"Speedup vs Qiskit:    {t_qiskit / t_mahout:10.2f}x")
+
+    # Run Verification after benchmarks
+    verify_correctness(
+        {
+            "Mahout": mahout_all_states,
+            "PennyLane": pl_all_states,
+            "Qiskit": qiskit_all_states,
+        }
+    )
diff --git a/qdp/qdp-core/src/error.rs b/qdp/qdp-core/src/error.rs
index 5cfaabedf..5d7adfbd0 100644
--- a/qdp/qdp-core/src/error.rs
+++ b/qdp/qdp-core/src/error.rs
@@ -36,6 +36,9 @@ pub enum MahoutError {
 
     #[error("I/O error: {0}")]
     Io(String),
+
+    #[error("Not implemented: {0}")]
+    NotImplemented(String),
 }
 
 /// Result type alias for Mahout operations
diff --git a/qdp/qdp-core/src/gpu/encodings/amplitude.rs 
b/qdp/qdp-core/src/gpu/encodings/amplitude.rs
index 6c4277125..ff1490c48 100644
--- a/qdp/qdp-core/src/gpu/encodings/amplitude.rs
+++ b/qdp/qdp-core/src/gpu/encodings/amplitude.rs
@@ -17,7 +17,7 @@
 // Amplitude encoding: direct state injection with L2 normalization
 
 use std::sync::Arc;
-use arrow::array::{Array, Float64Array};
+
 use cudarc::driver::CudaDevice;
 use crate::error::{MahoutError, Result};
 use crate::gpu::memory::GpuStateVector;
@@ -29,7 +29,7 @@ use std::ffi::c_void;
 #[cfg(target_os = "linux")]
 use cudarc::driver::DevicePtr;
 #[cfg(target_os = "linux")]
-use qdp_kernels::launch_amplitude_encode;
+use qdp_kernels::{launch_amplitude_encode, launch_amplitude_encode_batch};
 #[cfg(target_os = "linux")]
 use crate::gpu::memory::{ensure_device_memory_available, map_allocation_error};
 
@@ -133,140 +133,97 @@ impl QuantumEncoder for AmplitudeEncoder {
         }
     }
 
-    fn name(&self) -> &'static str {
-        "amplitude"
-    }
-
-    fn description(&self) -> &'static str {
-        "Amplitude encoding with L2 normalization"
-    }
-
-    /// Override to avoid intermediate Vec allocation. Processes chunks 
directly to GPU offsets.
-    fn encode_chunked(
+    /// Encode multiple samples in a single GPU allocation and kernel launch
+    #[cfg(target_os = "linux")]
+    fn encode_batch(
         &self,
         device: &Arc<CudaDevice>,
-        chunks: &[Float64Array],
+        batch_data: &[f64],
+        num_samples: usize,
+        sample_size: usize,
         num_qubits: usize,
     ) -> Result<GpuStateVector> {
-        #[cfg(target_os = "linux")]
-        {
-            let total_len: usize = chunks.iter().map(|c| c.len()).sum();
-            let state_len = 1 << num_qubits;
+        crate::profile_scope!("AmplitudeEncoder::encode_batch");
 
-            if total_len == 0 {
-                return Err(MahoutError::InvalidInput("Input chunks cannot be 
empty".to_string()));
-            }
-            if total_len > state_len {
-                return Err(MahoutError::InvalidInput(
-                    format!("Total data length {} exceeds state vector size 
{}", total_len, state_len)
-                ));
-            }
-            if num_qubits == 0 || num_qubits > 30 {
-                return Err(MahoutError::InvalidInput(
-                    format!("Number of qubits {} must be between 1 and 30", 
num_qubits)
-                ));
-            }
+        // Validate inputs using shared preprocessor
+        Preprocessor::validate_batch(batch_data, num_samples, sample_size, 
num_qubits)?;
 
-            let state_vector = {
-                crate::profile_scope!("GPU::Alloc");
-                GpuStateVector::new(device, num_qubits)?
-            };
-
-            // Require pre-processed data (no nulls)
-            for chunk in chunks {
-                if chunk.null_count() > 0 {
-                    return Err(MahoutError::InvalidInput(
-                        format!("Chunk contains {} null values. Data must be 
pre-processed before encoding", chunk.null_count())
-                    ));
-                }
-            }
+        let state_len = 1 << num_qubits;
 
-            let norm = {
-                crate::profile_scope!("CPU::L2Norm");
-                let mut norm_sq = 0.0;
-                for chunk in chunks {
-                    norm_sq += chunk.values().iter().map(|&x| x * 
x).sum::<f64>();
-                }
-                let norm = norm_sq.sqrt();
-                if norm == 0.0 {
-                    return Err(MahoutError::InvalidInput("Input data has zero 
norm".to_string()));
-                }
-                norm
+        // Calculate L2 norms using shared preprocessor (parallelized)
+        let norms = Preprocessor::calculate_batch_l2_norms(batch_data, 
num_samples, sample_size)?;
+
+        // Convert to inverse norms
+        let inv_norms: Vec<f64> = norms.iter().map(|n| 1.0 / n).collect();
+
+        // Allocate single large GPU buffer for all states
+        let batch_state_vector = {
+            crate::profile_scope!("GPU::AllocBatch");
+            GpuStateVector::new_batch(device, num_samples, num_qubits)?
+        };
+
+        // Upload input data to GPU
+        let input_batch_gpu = {
+            crate::profile_scope!("GPU::H2D_InputBatch");
+            device.htod_sync_copy(batch_data)
+                .map_err(|e| MahoutError::MemoryAllocation(
+                    format!("Failed to upload batch input: {:?}", e)
+                ))?
+        };
+
+        // Upload inverse norms to GPU
+        let inv_norms_gpu = {
+            crate::profile_scope!("GPU::H2D_Norms");
+            device.htod_sync_copy(&inv_norms)
+                .map_err(|e| MahoutError::MemoryAllocation(
+                    format!("Failed to upload norms: {:?}", e)
+                ))?
+        };
+
+        // Launch batch kernel
+        {
+            crate::profile_scope!("GPU::BatchKernelLaunch");
+            let ret = unsafe {
+                launch_amplitude_encode_batch(
+                    *input_batch_gpu.device_ptr() as *const f64,
+                    batch_state_vector.ptr() as *mut c_void,
+                    *inv_norms_gpu.device_ptr() as *const f64,
+                    num_samples,
+                    sample_size,
+                    state_len,
+                    std::ptr::null_mut(), // default stream
+                )
             };
 
-            let mut current_offset = 0;
-            for chunk in chunks {
-                let chunk_len = chunk.len();
-                if chunk_len == 0 {
-                    continue;
-                }
-
-                let state_ptr_offset = unsafe {
-                    state_vector.ptr().cast::<u8>()
-                        .add(current_offset * 
std::mem::size_of::<qdp_kernels::CuDoubleComplex>())
-                        .cast::<std::ffi::c_void>()
-                };
-
-                let chunk_slice = {
-                    crate::profile_scope!("GPU::ChunkH2DCopy");
-                    // Zero-copy from Arrow buffer to GPU
-                    device.htod_sync_copy(chunk.values())
-                        .map_err(|e| 
MahoutError::MemoryAllocation(format!("Failed to copy chunk: {:?}", e)))?
-                };
-
-                {
-                    crate::profile_scope!("GPU::KernelLaunch");
-                    let ret = unsafe {
-                        launch_amplitude_encode(
-                            *chunk_slice.device_ptr() as *const f64,
-                            state_ptr_offset,
-                            chunk_len,
-                            state_len,
-                            norm,
-                            std::ptr::null_mut(),
-                        )
-                    };
-                    if ret != 0 {
-                        return Err(MahoutError::KernelLaunch(
-                            format!("Kernel launch failed: {} ({})", ret, 
cuda_error_to_string(ret))
-                        ));
-                    }
-                }
-
-                current_offset += chunk_len;
-            }
-
-            if total_len < state_len {
-                let padding_bytes = (state_len - total_len) * 
std::mem::size_of::<qdp_kernels::CuDoubleComplex>();
-                let tail_ptr = unsafe { state_vector.ptr().add(total_len) as 
*mut c_void };
-
-                unsafe {
-                    unsafe extern "C" {
-                        fn cudaMemsetAsync(devPtr: *mut c_void, value: i32, 
count: usize, stream: *mut c_void) -> i32;
-                    }
-                    let result = cudaMemsetAsync(tail_ptr, 0, padding_bytes, 
std::ptr::null_mut());
-                    if result != 0 {
-                        return Err(MahoutError::Cuda(
-                            format!("Failed to zero-fill padding: {} ({})", 
result, cuda_error_to_string(result))
-                        ));
-                    }
-                }
+            if ret != 0 {
+                return Err(MahoutError::KernelLaunch(
+                    format!("Batch kernel launch failed: {} ({})", ret, 
cuda_error_to_string(ret))
+                ));
             }
+        }
 
+        // Synchronize
+        {
+            crate::profile_scope!("GPU::Synchronize");
             device.synchronize()
                 .map_err(|e| MahoutError::Cuda(format!("Sync failed: {:?}", 
e)))?;
-
-            Ok(state_vector)
         }
 
-        #[cfg(not(target_os = "linux"))]
-        {
-            Err(MahoutError::Cuda("CUDA unavailable (non-Linux)".to_string()))
-        }
+        Ok(batch_state_vector)
+    }
+
+    fn name(&self) -> &'static str {
+        "amplitude"
+    }
+
+    fn description(&self) -> &'static str {
+        "Amplitude encoding with L2 normalization"
     }
 }
 
 impl AmplitudeEncoder {
+
+
     /// Async pipeline encoding for large data (SSS-tier optimization)
     ///
     /// Uses the generic dual-stream pipeline infrastructure to overlap
diff --git a/qdp/qdp-core/src/gpu/encodings/mod.rs 
b/qdp/qdp-core/src/gpu/encodings/mod.rs
index 539355c2f..7273179d0 100644
--- a/qdp/qdp-core/src/gpu/encodings/mod.rs
+++ b/qdp/qdp-core/src/gpu/encodings/mod.rs
@@ -17,7 +17,7 @@
 // Quantum encoding strategies (Strategy Pattern)
 
 use std::sync::Arc;
-use arrow::array::Float64Array;
+
 use cudarc::driver::CudaDevice;
 use crate::error::Result;
 use crate::gpu::memory::GpuStateVector;
@@ -34,18 +34,18 @@ pub trait QuantumEncoder: Send + Sync {
         num_qubits: usize,
     ) -> Result<GpuStateVector>;
 
-    /// Encode from chunked Arrow arrays
-    ///
-    /// Default implementation flattens chunks. (TODO: Encoders can override 
for true zero-copy.)
-    fn encode_chunked(
+    /// Encode multiple samples in a single GPU allocation and kernel launch 
(Batch Encoding)
+    fn encode_batch(
         &self,
-        device: &Arc<CudaDevice>,
-        chunks: &[Float64Array],
-        num_qubits: usize,
+        _device: &Arc<CudaDevice>,
+        _batch_data: &[f64],
+        _num_samples: usize,
+        _sample_size: usize,
+        _num_qubits: usize,
     ) -> Result<GpuStateVector> {
-        // Default: flatten and use regular encode
-        let data = crate::io::arrow_to_vec_chunked(chunks);
-        self.encode(device, &data, num_qubits)
+        Err(crate::error::MahoutError::NotImplemented(
+            format!("Batch encoding not implemented for {}", self.name())
+        ))
     }
 
     /// Validate input data before encoding
diff --git a/qdp/qdp-core/src/gpu/memory.rs b/qdp/qdp-core/src/gpu/memory.rs
index 1ac8eabb5..a333e103d 100644
--- a/qdp/qdp-core/src/gpu/memory.rs
+++ b/qdp/qdp-core/src/gpu/memory.rs
@@ -220,4 +220,46 @@ impl GpuStateVector {
     pub fn size_elements(&self) -> usize {
         self.size_elements
     }
+
+    /// Create GPU state vector for a batch of samples
+    /// Allocates num_samples * 2^qubits complex numbers on GPU
+    pub fn new_batch(_device: &Arc<CudaDevice>, num_samples: usize, qubits: 
usize) -> Result<Self> {
+        let single_state_size: usize = 1usize << qubits;
+        let total_elements = num_samples.checked_mul(single_state_size)
+            .ok_or_else(|| MahoutError::MemoryAllocation(
+                format!("Batch size overflow: {} samples * {} elements", 
num_samples, single_state_size)
+            ))?;
+
+        #[cfg(target_os = "linux")]
+        {
+            let requested_bytes = total_elements
+                .checked_mul(std::mem::size_of::<CuDoubleComplex>())
+                .ok_or_else(|| MahoutError::MemoryAllocation(
+                    format!("Requested GPU allocation size overflow 
(elements={})", total_elements)
+                ))?;
+
+            // Pre-flight check
+            ensure_device_memory_available(requested_bytes, "batch state 
vector allocation", Some(qubits))?;
+
+            let slice = unsafe {
+                _device.alloc::<CuDoubleComplex>(total_elements)
+            }.map_err(|e| map_allocation_error(
+                requested_bytes,
+                "batch state vector allocation",
+                Some(qubits),
+                e,
+            ))?;
+
+            Ok(Self {
+                buffer: Arc::new(GpuBufferRaw { slice }),
+                num_qubits: qubits,
+                size_elements: total_elements,
+            })
+        }
+
+        #[cfg(not(target_os = "linux"))]
+        {
+            Err(MahoutError::Cuda("CUDA is only available on Linux. This build 
does not support GPU operations.".to_string()))
+        }
+    }
 }
diff --git a/qdp/qdp-core/src/io.rs b/qdp/qdp-core/src/io.rs
index fc9f09cd4..93ad1d018 100644
--- a/qdp/qdp-core/src/io.rs
+++ b/qdp/qdp-core/src/io.rs
@@ -22,7 +22,7 @@ use std::fs::File;
 use std::path::Path;
 use std::sync::Arc;
 
-use arrow::array::{Array, ArrayRef, Float64Array, RecordBatch};
+use arrow::array::{Array, ArrayRef, Float64Array, ListArray, RecordBatch};
 use arrow::datatypes::{DataType, Field, Schema};
 use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
 use parquet::arrow::ArrowWriter;
@@ -270,3 +270,98 @@ pub fn write_arrow_to_parquet<P: AsRef<Path>>(
 
     Ok(())
 }
+
+/// Read batch data from Parquet file with list column format
+///
+/// Efficiently reads Parquet files where each row contains a list of values.
+/// Returns a flattened Vec with all samples concatenated, suitable for batch 
encoding.
+///
+/// # Arguments
+/// * `path` - Path to Parquet file
+///
+/// # Returns
+/// Tuple of (flattened_data, num_samples, sample_size)
+///
+/// # Example
+/// File format: column "feature_vector" with type List<Float64>
+/// Each row = one sample = one list of floats
+pub fn read_parquet_batch<P: AsRef<Path>>(path: P) -> Result<(Vec<f64>, usize, 
usize)> {
+    let file = File::open(path.as_ref()).map_err(|e| {
+        MahoutError::Io(format!("Failed to open Parquet file: {}", e))
+    })?;
+
+    let builder = ParquetRecordBatchReaderBuilder::try_new(file).map_err(|e| {
+        MahoutError::Io(format!("Failed to create Parquet reader: {}", e))
+    })?;
+
+    let mut reader = builder.build().map_err(|e| {
+        MahoutError::Io(format!("Failed to build Parquet reader: {}", e))
+    })?;
+
+    let mut all_data = Vec::new();
+    let mut num_samples = 0;
+    let mut sample_size = None;
+
+    while let Some(batch_result) = reader.next() {
+        let batch = batch_result.map_err(|e| {
+            MahoutError::Io(format!("Failed to read Parquet batch: {}", e))
+        })?;
+
+        if batch.num_columns() == 0 {
+            return Err(MahoutError::Io("Parquet file has no 
columns".to_string()));
+        }
+
+        let column = batch.column(0);
+
+        // Handle List<Float64> column type
+        if let DataType::List(_) = column.data_type() {
+            let list_array = column
+                .as_any()
+                .downcast_ref::<ListArray>()
+                .ok_or_else(|| MahoutError::Io("Failed to downcast to 
ListArray".to_string()))?;
+
+            for i in 0..list_array.len() {
+                let value_array = list_array.value(i);
+                let float_array = value_array
+                    .as_any()
+                    .downcast_ref::<Float64Array>()
+                    .ok_or_else(|| MahoutError::Io("List values must be 
Float64".to_string()))?;
+
+                let current_size = float_array.len();
+
+                // Verify all samples have the same size
+                if let Some(expected_size) = sample_size {
+                    if current_size != expected_size {
+                        return Err(MahoutError::InvalidInput(format!(
+                            "Inconsistent sample sizes: expected {}, got {}",
+                            expected_size, current_size
+                        )));
+                    }
+                } else {
+                    sample_size = Some(current_size);
+                    all_data.reserve(current_size * 100); // Reserve space
+                }
+
+                // Efficiently copy the values
+                if float_array.null_count() == 0 {
+                    all_data.extend_from_slice(float_array.values());
+                } else {
+                    all_data.extend(float_array.iter().map(|opt| 
opt.unwrap_or(0.0)));
+                }
+
+                num_samples += 1;
+            }
+        } else {
+            return Err(MahoutError::Io(format!(
+                "Expected List<Float64> column, got {:?}",
+                column.data_type()
+            )));
+        }
+    }
+
+    let sample_size = sample_size.ok_or_else(|| {
+        MahoutError::Io("Parquet file contains no data".to_string())
+    })?;
+
+    Ok((all_data, num_samples, sample_size))
+}
diff --git a/qdp/qdp-core/src/lib.rs b/qdp/qdp-core/src/lib.rs
index 406715edd..65c08bd02 100644
--- a/qdp/qdp-core/src/lib.rs
+++ b/qdp/qdp-core/src/lib.rs
@@ -26,7 +26,7 @@ mod profiling;
 pub use error::{MahoutError, Result};
 
 use std::sync::Arc;
-use arrow::array::Float64Array;
+
 use cudarc::driver::CudaDevice;
 use crate::dlpack::DLManagedTensor;
 use crate::gpu::get_encoder;
@@ -88,40 +88,55 @@ impl QdpEngine {
         &self.device
     }
 
-    /// Encode from chunked Arrow arrays (zero-copy from Parquet)
+    /// Encode multiple samples in a single fused kernel (most efficient)
+    ///
+    /// Allocates one large GPU buffer and launches a single batch kernel.
+    /// This is faster than encode_batch() as it reduces allocation and kernel 
launch overhead.
     ///
     /// # Arguments
-    /// * `chunks` - Chunked Arrow Float64Arrays (from read_parquet_to_arrow)
+    /// * `batch_data` - Flattened batch data (all samples concatenated)
+    /// * `num_samples` - Number of samples in the batch
+    /// * `sample_size` - Size of each sample
     /// * `num_qubits` - Number of qubits
-    /// * `encoding_method` - Strategy: "amplitude", "angle", or "basis"
+    /// * `encoding_method` - Strategy (currently only "amplitude" supported 
for batch)
     ///
     /// # Returns
-    /// DLPack pointer for zero-copy PyTorch integration
-    pub fn encode_chunked(
+    /// Single DLPack pointer containing all encoded states (shape: 
[num_samples, 2^num_qubits])
+    pub fn encode_batch(
         &self,
-        chunks: &[Float64Array],
+        batch_data: &[f64],
+        num_samples: usize,
+        sample_size: usize,
         num_qubits: usize,
         encoding_method: &str,
     ) -> Result<*mut DLManagedTensor> {
-        crate::profile_scope!("Mahout::EncodeChunked");
+        crate::profile_scope!("Mahout::EncodeBatch");
 
         let encoder = get_encoder(encoding_method)?;
-        let state_vector = encoder.encode_chunked(&self.device, chunks, 
num_qubits)?;
-        let dlpack_ptr = {
-            crate::profile_scope!("Mahout::CreateDLPack");
-            state_vector.to_dlpack()
-        };
+        let state_vector = encoder.encode_batch(
+            &self.device,
+            batch_data,
+            num_samples,
+            sample_size,
+            num_qubits,
+        )?;
+
+        let dlpack_ptr = state_vector.to_dlpack();
         Ok(dlpack_ptr)
     }
 
     /// Load data from Parquet file and encode into quantum state
     ///
-    /// **ZERO-COPY**: Reads Parquet chunks directly without intermediate Vec 
allocation.
+    /// Reads Parquet file with List<Float64> column format and encodes all 
samples
+    /// in a single batch operation. Bypasses pandas for maximum performance.
     ///
     /// # Arguments
     /// * `path` - Path to Parquet file
     /// * `num_qubits` - Number of qubits
     /// * `encoding_method` - Strategy: "amplitude", "angle", or "basis"
+    ///
+    /// # Returns
+    /// Single DLPack pointer containing all encoded states (shape: 
[num_samples, 2^num_qubits])
     pub fn encode_from_parquet(
         &self,
         path: &str,
@@ -130,8 +145,14 @@ impl QdpEngine {
     ) -> Result<*mut DLManagedTensor> {
         crate::profile_scope!("Mahout::EncodeFromParquet");
 
-        let chunks = crate::io::read_parquet_to_arrow(path)?;
-        self.encode_chunked(&chunks, num_qubits, encoding_method)
+        // Read Parquet directly using Arrow (faster than pandas)
+        let (batch_data, num_samples, sample_size) = {
+            crate::profile_scope!("IO::ReadParquetBatch");
+            crate::io::read_parquet_batch(path)?
+        };
+
+        // Encode using fused batch kernel
+        self.encode_batch(&batch_data, num_samples, sample_size, num_qubits, 
encoding_method)
     }
 }
 
diff --git a/qdp/qdp-core/src/preprocessing.rs 
b/qdp/qdp-core/src/preprocessing.rs
index 233548a0f..0d8e70148 100644
--- a/qdp/qdp-core/src/preprocessing.rs
+++ b/qdp/qdp-core/src/preprocessing.rs
@@ -76,4 +76,59 @@ impl Preprocessor {
 
         Ok(norm)
     }
+
+    /// Validates input constraints for batch processing.
+    pub fn validate_batch(
+        batch_data: &[f64],
+        num_samples: usize,
+        sample_size: usize,
+        num_qubits: usize,
+    ) -> Result<()> {
+        if batch_data.len() != num_samples * sample_size {
+            return Err(MahoutError::InvalidInput(
+                format!("Batch data length {} doesn't match num_samples {} * 
sample_size {}",
+                    batch_data.len(), num_samples, sample_size)
+            ));
+        }
+
+        if num_qubits == 0 || num_qubits > 30 {
+            return Err(MahoutError::InvalidInput(
+                format!("Number of qubits {} must be between 1 and 30", 
num_qubits)
+            ));
+        }
+
+        let state_len = 1 << num_qubits;
+        if sample_size > state_len {
+            return Err(MahoutError::InvalidInput(
+                format!("Sample size {} exceeds state vector size {}", 
sample_size, state_len)
+            ));
+        }
+
+        Ok(())
+    }
+
+    /// Calculates L2 norms for a batch of samples in parallel.
+    pub fn calculate_batch_l2_norms(
+        batch_data: &[f64],
+        _num_samples: usize,
+        sample_size: usize,
+    ) -> Result<Vec<f64>> {
+        crate::profile_scope!("CPU::BatchL2Norm");
+
+        // Process chunks in parallel using rayon
+        batch_data
+            .par_chunks(sample_size)
+            .enumerate()
+            .map(|(i, sample)| {
+                let norm_sq: f64 = sample.iter().map(|&x| x * x).sum();
+                let norm = norm_sq.sqrt();
+                if norm == 0.0 {
+                    return Err(MahoutError::InvalidInput(
+                        format!("Sample {} has zero norm", i)
+                    ));
+                }
+                Ok(norm)
+            })
+            .collect()
+    }
 }
diff --git a/qdp/qdp-kernels/src/amplitude.cu b/qdp/qdp-kernels/src/amplitude.cu
index b1cd06fea..4dd80fb0b 100644
--- a/qdp/qdp-kernels/src/amplitude.cu
+++ b/qdp/qdp-kernels/src/amplitude.cu
@@ -110,6 +110,127 @@ int launch_amplitude_encode(
     return (int)cudaGetLastError();
 }
 
+/// Optimized batch amplitude encoding kernel
+///
+/// Memory Layout (row-major):
+/// - input_batch: [sample0_data | sample1_data | ... | sampleN_data]
+/// - state_batch: [sample0_state | sample1_state | ... | sampleN_state]
+///
+/// Optimizations:
+/// 1. Vectorized double2 loads for 128-bit memory transactions
+/// 2. Grid-stride loop for arbitrary batch sizes
+/// 3. Coalesced memory access within warps
+/// 4. Minimized register pressure
+__global__ void amplitude_encode_batch_kernel(
+    const double* __restrict__ input_batch,
+    cuDoubleComplex* __restrict__ state_batch,
+    const double* __restrict__ inv_norms,
+    size_t num_samples,
+    size_t input_len,
+    size_t state_len
+) {
+    // Grid-stride loop pattern for flexibility
+    const size_t elements_per_sample = state_len / 2;  // Each thread handles 
2 elements
+    const size_t total_work = num_samples * elements_per_sample;
+    const size_t stride = gridDim.x * blockDim.x;
+
+    size_t global_idx = blockIdx.x * blockDim.x + threadIdx.x;
+
+    // Process elements in grid-stride fashion
+    for (size_t idx = global_idx; idx < total_work; idx += stride) {
+        // Decompose linear index into (sample, element_pair)
+        const size_t sample_idx = idx / elements_per_sample;
+        const size_t elem_pair = idx % elements_per_sample;
+
+        // Calculate base addresses (strength-reduced)
+        const size_t input_base = sample_idx * input_len;
+        const size_t state_base = sample_idx * state_len;
+        const size_t elem_offset = elem_pair * 2;
+
+        // Load inverse norm (cached by L1)
+        const double inv_norm = inv_norms[sample_idx];
+
+        // Vectorized load: read 2 doubles as double2 for 128-bit transaction
+        double v1, v2;
+        if (elem_offset + 1 < input_len) {
+            // Aligned vectorized load
+            const double2 vec_data = __ldg(reinterpret_cast<const 
double2*>(input_batch + input_base) + elem_pair);
+            v1 = vec_data.x;
+            v2 = vec_data.y;
+        } else if (elem_offset < input_len) {
+            // Edge case: single element load
+            v1 = __ldg(input_batch + input_base + elem_offset);
+            v2 = 0.0;
+        } else {
+            // Padding region
+            v1 = v2 = 0.0;
+        }
+
+        // Normalize and write as complex numbers
+        // Compiler will optimize multiplications
+        const cuDoubleComplex c1 = make_cuDoubleComplex(v1 * inv_norm, 0.0);
+        const cuDoubleComplex c2 = make_cuDoubleComplex(v2 * inv_norm, 0.0);
+
+        // Write to global memory (coalesced within warp)
+        state_batch[state_base + elem_offset] = c1;
+        if (elem_offset + 1 < state_len) {
+            state_batch[state_base + elem_offset + 1] = c2;
+        }
+    }
+}
+
+/// Launch optimized batch amplitude encoding kernel
+///
+/// # Arguments
+/// * input_batch_d - Device pointer to batch input data
+/// * state_batch_d - Device pointer to output batch state vectors
+/// * inv_norms_d - Device pointer to inverse norms array
+/// * num_samples - Number of samples in batch
+/// * input_len - Elements per sample
+/// * state_len - State vector size per sample (2^num_qubits)
+/// * stream - CUDA stream for async execution
+///
+/// # Returns
+/// CUDA error code (0 = cudaSuccess)
+int launch_amplitude_encode_batch(
+    const double* input_batch_d,
+    void* state_batch_d,
+    const double* inv_norms_d,
+    size_t num_samples,
+    size_t input_len,
+    size_t state_len,
+    cudaStream_t stream
+) {
+    if (num_samples == 0 || state_len == 0) {
+        return cudaErrorInvalidValue;
+    }
+
+    cuDoubleComplex* state_complex_d = 
static_cast<cuDoubleComplex*>(state_batch_d);
+
+    // Optimal configuration for modern GPUs (SM 7.0+)
+    // - Block size: 256 threads (8 warps, good occupancy)
+    // - Grid size: Enough blocks to saturate GPU, but not excessive
+    const int blockSize = 256;
+    const size_t total_work = num_samples * (state_len / 2);
+
+    // Calculate grid size: aim for high occupancy without too many blocks
+    // Limit to reasonable number of blocks to avoid scheduler overhead
+    const size_t blocks_needed = (total_work + blockSize - 1) / blockSize;
+    const size_t max_blocks = 2048;  // Reasonable limit for most GPUs
+    const size_t gridSize = (blocks_needed < max_blocks) ? blocks_needed : 
max_blocks;
+
+    amplitude_encode_batch_kernel<<<gridSize, blockSize, 0, stream>>>(
+        input_batch_d,
+        state_complex_d,
+        inv_norms_d,
+        num_samples,
+        input_len,
+        state_len
+    );
+
+    return (int)cudaGetLastError();
+}
+
 // TODO: Future encoding methods:
 // - launch_angle_encode (angle encoding)
 // - launch_basis_encode (basis encoding)
diff --git a/qdp/qdp-kernels/src/lib.rs b/qdp/qdp-kernels/src/lib.rs
index a59733fb8..c1bb07544 100644
--- a/qdp/qdp-kernels/src/lib.rs
+++ b/qdp/qdp-kernels/src/lib.rs
@@ -53,6 +53,21 @@ unsafe extern "C" {
         stream: *mut c_void,
     ) -> i32;
 
+    /// Launch batch amplitude encoding kernel
+    /// Returns CUDA error code (0 = success)
+    ///
+    /// # Safety
+    /// Requires valid GPU pointers, must sync before freeing
+    pub fn launch_amplitude_encode_batch(
+        input_batch_d: *const f64,
+        state_batch_d: *mut c_void,
+        inv_norms_d: *const f64,
+        num_samples: usize,
+        input_len: usize,
+        state_len: usize,
+        stream: *mut c_void,
+    ) -> i32;
+
     // TODO: launch_angle_encode, launch_basis_encode
 }
 
diff --git a/qdp/qdp-python/src/lib.rs b/qdp/qdp-python/src/lib.rs
index 4ff1e4b02..cd3fcc3ac 100644
--- a/qdp/qdp-python/src/lib.rs
+++ b/qdp/qdp-python/src/lib.rs
@@ -181,25 +181,26 @@ impl QdpEngine {
         })
     }
 
-    /// Load data from Parquet file and encode into quantum state
+    /// Encode from Parquet file (FASTEST - recommended for batches)
     ///
-    /// **ZERO-COPY**: Reads Parquet chunks directly without intermediate Vec 
allocation.
+    /// Direct Parquet→GPU pipeline:
+    /// - Reads List<Float64> column format using Arrow
+    /// - Zero-copy data extraction
+    /// - Single optimized batch kernel launch
+    /// - Returns batched tensor (shape: [num_samples, 2^num_qubits])
     ///
     /// Args:
     ///     path: Path to Parquet file
     ///     num_qubits: Number of qubits for encoding
-    ///     encoding_method: Encoding strategy ("amplitude", "angle", or 
"basis")
+    ///     encoding_method: Encoding strategy (currently only "amplitude")
     ///
     /// Returns:
-    ///     QuantumTensor: DLPack-compatible tensor for zero-copy PyTorch 
integration
-    ///
-    /// Raises:
-    ///     RuntimeError: If encoding fails
+    ///     QuantumTensor: DLPack tensor containing all encoded states
     ///
     /// Example:
     ///     >>> engine = QdpEngine(device_id=0)
-    ///     >>> qtensor = engine.encode_from_parquet("data.parquet", 
num_qubits=2, encoding_method="amplitude")
-    ///     >>> torch_tensor = torch.from_dlpack(qtensor)
+    ///     >>> batched = engine.encode_from_parquet("data.parquet", 16, 
"amplitude")
+    ///     >>> torch_tensor = torch.from_dlpack(batched)  # Shape: [200, 
65536]
     fn encode_from_parquet(&self, path: &str, num_qubits: usize, 
encoding_method: &str) -> PyResult<QuantumTensor> {
         let ptr = self.engine.encode_from_parquet(path, num_qubits, 
encoding_method)
             .map_err(|e| PyRuntimeError::new_err(format!("Encoding from 
parquet failed: {}", e)))?;


Reply via email to