This is an automated email from the ASF dual-hosted git repository.

guanmingchiu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/mahout.git

commit c27f98e0018a20bdcfe7269362eaa67b19045d7f
Author: Guan-Ming (Wesley) Chiu <[email protected]>
AuthorDate: Wed Dec 10 17:31:28 2025 +0800

    [QDP] Add Arrow IPC support (#704)
    
    * Add Arrow IPC support
    
    * Update default frameworks to "mahout-parquet"
    
    * Add tests for Arrow IPC
    
    * Add todo to prevent oom
    
    * Add TODO for supporting multiple float types
---
 qdp/benchmark/benchmark_e2e_final.py | 131 +++++++++++++------
 qdp/qdp-core/src/io.rs               | 231 +++++++++++++++++++++------------
 qdp/qdp-core/src/lib.rs              |  31 +++++
 qdp/qdp-core/tests/arrow_ipc_io.rs   | 239 +++++++++++++++++++++++++++++++++++
 qdp/qdp-python/src/lib.rs            |  31 ++++-
 5 files changed, 538 insertions(+), 125 deletions(-)

diff --git a/qdp/benchmark/benchmark_e2e_final.py 
b/qdp/benchmark/benchmark_e2e_final.py
index 27efab93d..c00476b2d 100644
--- a/qdp/benchmark/benchmark_e2e_final.py
+++ b/qdp/benchmark/benchmark_e2e_final.py
@@ -37,6 +37,7 @@ import os
 import itertools
 import pyarrow as pa
 import pyarrow.parquet as pq
+import pyarrow.ipc as ipc
 from mahout_qdp import QdpEngine
 
 # Competitors
@@ -57,6 +58,7 @@ except ImportError:
 
 # Config
 DATA_FILE = "final_benchmark_data.parquet"
+ARROW_FILE = "final_benchmark_data.arrow"
 HIDDEN_DIM = 16
 BATCH_SIZE = 64  # Small batch to stress loop overhead
 
@@ -71,28 +73,34 @@ class DummyQNN(nn.Module):
 
 
 def generate_data(n_qubits, n_samples):
-    if os.path.exists(DATA_FILE):
-        os.remove(DATA_FILE)
+    for f in [DATA_FILE, ARROW_FILE]:
+        if os.path.exists(f):
+            os.remove(f)
 
     print(f"Generating {n_samples} samples of {n_qubits} qubits...")
     dim = 1 << n_qubits
 
-    # Generate for PennyLane/Qiskit (List format)
-    chunk_size = 500
-    schema_list = pa.schema([("feature_vector", pa.list_(pa.float64()))])
+    # Generate all data at once
+    np.random.seed(42)
+    all_data = np.random.rand(n_samples, dim).astype(np.float64)
 
-    with pq.ParquetWriter(DATA_FILE, schema_list) as writer:
-        for start_idx in range(0, n_samples, chunk_size):
-            current = min(chunk_size, n_samples - start_idx)
-            data = np.random.rand(current, dim).astype(np.float64)
-            feature_vectors = [row.tolist() for row in data]
-            arrays = pa.array(feature_vectors, type=pa.list_(pa.float64()))
-            batch_table = pa.Table.from_arrays([arrays], 
names=["feature_vector"])
-            writer.write_table(batch_table)
+    # Save as Parquet (List format for PennyLane/Qiskit)
+    feature_vectors = [row.tolist() for row in all_data]
+    table = pa.table(
+        {"feature_vector": pa.array(feature_vectors, 
type=pa.list_(pa.float64()))}
+    )
+    pq.write_table(table, DATA_FILE)
+
+    # Save as Arrow IPC (FixedSizeList format for Mahout)
+    arr = pa.FixedSizeListArray.from_arrays(pa.array(all_data.flatten()), dim)
+    arrow_table = pa.table({"data": arr})
+    with ipc.RecordBatchFileWriter(ARROW_FILE, arrow_table.schema) as writer:
+        writer.write_table(arrow_table)
 
-    file_size_mb = os.path.getsize(DATA_FILE) / (1024 * 1024)
+    parquet_size = os.path.getsize(DATA_FILE) / (1024 * 1024)
+    arrow_size = os.path.getsize(ARROW_FILE) / (1024 * 1024)
     print(f"  Generated {n_samples} samples")
-    print(f"  Parquet file size: {file_size_mb:.2f} MB")
+    print(f"  Parquet: {parquet_size:.2f} MB, Arrow IPC: {arrow_size:.2f} MB")
 
 
 # -----------------------------------------------------------
@@ -220,10 +228,10 @@ def run_pennylane(n_qubits, n_samples):
 
 
 # -----------------------------------------------------------
-# 3. Mahout Full Pipeline
+# 3. Mahout Parquet Pipeline
 # -----------------------------------------------------------
-def run_mahout(engine, n_qubits, n_samples):
-    print("\n[Mahout] Full Pipeline (Disk -> GPU)...")
+def run_mahout_parquet(engine, n_qubits, n_samples):
+    print("\n[Mahout-Parquet] Full Pipeline (Parquet -> GPU)...")
     model = DummyQNN(n_qubits).cuda()
 
     torch.cuda.synchronize()
@@ -262,6 +270,44 @@ def run_mahout(engine, n_qubits, n_samples):
     return total_time, gpu_reshaped
 
 
+# -----------------------------------------------------------
+# 4. Mahout Arrow IPC Pipeline
+# -----------------------------------------------------------
+def run_mahout_arrow(engine, n_qubits, n_samples):
+    print("\n[Mahout-Arrow] Full Pipeline (Arrow IPC -> GPU)...")
+    model = DummyQNN(n_qubits).cuda()
+
+    torch.cuda.synchronize()
+    start_time = time.perf_counter()
+
+    arrow_encode_start = time.perf_counter()
+    batched_tensor = engine.encode_from_arrow_ipc(ARROW_FILE, n_qubits, 
"amplitude")
+    arrow_encode_time = time.perf_counter() - arrow_encode_start
+    print(f"  Arrow->GPU (IO+Encode): {arrow_encode_time:.4f} s")
+
+    dlpack_start = time.perf_counter()
+    gpu_batched = torch.from_dlpack(batched_tensor)
+    dlpack_time = time.perf_counter() - dlpack_start
+    print(f"  DLPack conversion: {dlpack_time:.4f} s")
+
+    state_len = 1 << n_qubits
+    gpu_reshaped = gpu_batched.view(n_samples, state_len)
+
+    reshape_start = time.perf_counter()
+    gpu_all_data = gpu_reshaped.abs().to(torch.float32)
+    reshape_time = time.perf_counter() - reshape_start
+    print(f"  Reshape & convert: {reshape_time:.4f} s")
+
+    for i in range(0, n_samples, BATCH_SIZE):
+        batch = gpu_all_data[i : i + BATCH_SIZE]
+        _ = model(batch)
+
+    torch.cuda.synchronize()
+    total_time = time.perf_counter() - start_time
+    print(f"  Total Time: {total_time:.4f} s")
+    return total_time, gpu_reshaped
+
+
 def compare_states(name_a, states_a, name_b, states_b):
     print("\n" + "=" * 70)
     print(f"VERIFICATION ({name_a} vs {name_b})")
@@ -314,22 +360,15 @@ if __name__ == "__main__":
     parser.add_argument(
         "--frameworks",
         nargs="+",
-        default=["mahout", "pennylane"],
-        choices=["mahout", "pennylane", "qiskit", "all"],
-        help="Frameworks to benchmark (default: mahout pennylane). Use 'all' 
to run all available frameworks.",
+        default=["mahout-parquet", "pennylane"],
+        choices=["mahout-parquet", "mahout-arrow", "pennylane", "qiskit", 
"all"],
+        help="Frameworks to benchmark. Use 'all' to run all available 
frameworks.",
     )
     args = parser.parse_args()
 
     # Expand "all" option
     if "all" in args.frameworks:
-        all_frameworks = []
-        if "mahout" in args.frameworks or "all" in args.frameworks:
-            all_frameworks.append("mahout")
-        if "pennylane" in args.frameworks or "all" in args.frameworks:
-            all_frameworks.append("pennylane")
-        if "qiskit" in args.frameworks or "all" in args.frameworks:
-            all_frameworks.append("qiskit")
-        args.frameworks = all_frameworks
+        args.frameworks = ["mahout-parquet", "mahout-arrow", "pennylane", 
"qiskit"]
 
     generate_data(args.qubits, args.samples)
 
@@ -345,7 +384,8 @@ if __name__ == "__main__":
 
     # Initialize results
     t_pl, pl_all_states = 0.0, None
-    t_mahout, mahout_all_states = 0.0, None
+    t_mahout_parquet, mahout_parquet_all_states = 0.0, None
+    t_mahout_arrow, mahout_arrow_all_states = 0.0, None
     t_qiskit, qiskit_all_states = 0.0, None
 
     # Run benchmarks
@@ -355,8 +395,15 @@ if __name__ == "__main__":
     if "qiskit" in args.frameworks:
         t_qiskit, qiskit_all_states = run_qiskit(args.qubits, args.samples)
 
-    if "mahout" in args.frameworks:
-        t_mahout, mahout_all_states = run_mahout(engine, args.qubits, 
args.samples)
+    if "mahout-parquet" in args.frameworks:
+        t_mahout_parquet, mahout_parquet_all_states = run_mahout_parquet(
+            engine, args.qubits, args.samples
+        )
+
+    if "mahout-arrow" in args.frameworks:
+        t_mahout_arrow, mahout_arrow_all_states = run_mahout_arrow(
+            engine, args.qubits, args.samples
+        )
 
     print("\n" + "=" * 70)
     print("E2E LATENCY (Lower is Better)")
@@ -364,8 +411,10 @@ if __name__ == "__main__":
     print("=" * 70)
 
     results = []
-    if t_mahout > 0:
-        results.append(("Mahout", t_mahout))
+    if t_mahout_parquet > 0:
+        results.append(("Mahout-Parquet", t_mahout_parquet))
+    if t_mahout_arrow > 0:
+        results.append(("Mahout-Arrow", t_mahout_arrow))
     if t_pl > 0:
         results.append(("PennyLane", t_pl))
     if t_qiskit > 0:
@@ -374,19 +423,23 @@ if __name__ == "__main__":
     results.sort(key=lambda x: x[1])
 
     for name, time_val in results:
-        print(f"{name:12s} {time_val:10.4f} s")
+        print(f"{name:16s} {time_val:10.4f} s")
 
     print("-" * 70)
-    if t_mahout > 0:
+    # Use fastest Mahout variant for speedup comparison
+    mahout_times = [t for t in [t_mahout_arrow, t_mahout_parquet] if t > 0]
+    t_mahout_best = min(mahout_times) if mahout_times else 0
+    if t_mahout_best > 0:
         if t_pl > 0:
-            print(f"Speedup vs PennyLane: {t_pl / t_mahout:10.2f}x")
+            print(f"Speedup vs PennyLane: {t_pl / t_mahout_best:10.2f}x")
         if t_qiskit > 0:
-            print(f"Speedup vs Qiskit:    {t_qiskit / t_mahout:10.2f}x")
+            print(f"Speedup vs Qiskit:    {t_qiskit / t_mahout_best:10.2f}x")
 
     # Run Verification after benchmarks
     verify_correctness(
         {
-            "Mahout": mahout_all_states,
+            "Mahout-Parquet": mahout_parquet_all_states,
+            "Mahout-Arrow": mahout_arrow_all_states,
             "PennyLane": pl_all_states,
             "Qiskit": qiskit_all_states,
         }
diff --git a/qdp/qdp-core/src/io.rs b/qdp/qdp-core/src/io.rs
index 93ad1d018..372f4ef75 100644
--- a/qdp/qdp-core/src/io.rs
+++ b/qdp/qdp-core/src/io.rs
@@ -14,25 +14,28 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-//! I/O module for reading and writing quantum data
+//! I/O utilities for reading and writing quantum data.
 //!
-//! This module provides efficient columnar data exchange with the data 
science ecosystem,
+//! Provides efficient columnar data exchange via Apache Arrow and Parquet 
formats.
+//!
+//! # TODO
+//! Consider using generic `T: ArrowPrimitiveType` instead of hardcoded 
`Float64Array`
+//! to support both Float32 and Float64 for flexibility in precision vs 
performance trade-offs.
 
 use std::fs::File;
 use std::path::Path;
 use std::sync::Arc;
 
-use arrow::array::{Array, ArrayRef, Float64Array, ListArray, RecordBatch};
+use arrow::array::{Array, ArrayRef, Float64Array, FixedSizeListArray, 
ListArray, RecordBatch};
 use arrow::datatypes::{DataType, Field, Schema};
+use arrow::ipc::reader::FileReader as ArrowFileReader;
 use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
 use parquet::arrow::ArrowWriter;
 use parquet::file::properties::WriterProperties;
 
 use crate::error::{MahoutError, Result};
 
-/// Convert Arrow Float64Array to Vec<f64>
-///
-/// Uses Arrow's internal buffer directly if no nulls, otherwise copies
+/// Converts an Arrow Float64Array to Vec<f64>.
 pub fn arrow_to_vec(array: &Float64Array) -> Vec<f64> {
     if array.null_count() == 0 {
         array.values().to_vec()
@@ -41,9 +44,7 @@ pub fn arrow_to_vec(array: &Float64Array) -> Vec<f64> {
     }
 }
 
-/// Convert chunked Arrow Float64Array to Vec<f64>
-///
-/// Efficiently flattens multiple Arrow arrays into a single Vec
+/// Flattens multiple Arrow Float64Arrays into a single Vec<f64>.
 pub fn arrow_to_vec_chunked(arrays: &[Float64Array]) -> Vec<f64> {
     let total_len: usize = arrays.iter().map(|a| a.len()).sum();
     let mut result = Vec::with_capacity(total_len);
@@ -59,45 +60,20 @@ pub fn arrow_to_vec_chunked(arrays: &[Float64Array]) -> 
Vec<f64> {
     result
 }
 
-/// Reads quantum data from a Parquet file.
-///
-/// Expects a single column named "data" containing Float64 values.
-/// This function performs one copy from Arrow to Vec.
-/// use `read_parquet_to_arrow` instead.
-///
-/// # Arguments
-/// * `path` - Path to the Parquet file
+/// Reads Float64 data from a Parquet file.
 ///
-/// # Returns
-/// Vector of f64 values from the first column
-///
-/// # Example
-/// ```no_run
-/// use qdp_core::io::read_parquet;
-///
-/// let data = read_parquet("quantum_data.parquet").unwrap();
-/// ```
+/// Expects a single Float64 column. For zero-copy access, use 
[`read_parquet_to_arrow`].
 pub fn read_parquet<P: AsRef<Path>>(path: P) -> Result<Vec<f64>> {
     let chunks = read_parquet_to_arrow(path)?;
     Ok(arrow_to_vec_chunked(&chunks))
 }
 
-/// Writes quantum data to a Parquet file.
-///
-/// Creates a single column named "data" containing Float64 values.
+/// Writes Float64 data to a Parquet file.
 ///
 /// # Arguments
-/// * `path` - Path to write the Parquet file
-/// * `data` - Vector of f64 values to write
-/// * `column_name` - Optional column name (defaults to "data")
-///
-/// # Example
-/// ```no_run
-/// use qdp_core::io::write_parquet;
-///
-/// let data = vec![0.5, 0.5, 0.5, 0.5];
-/// write_parquet("quantum_data.parquet", &data, None).unwrap();
-/// ```
+/// * `path` - Output file path
+/// * `data` - Data to write
+/// * `column_name` - Column name (defaults to "data")
 pub fn write_parquet<P: AsRef<Path>>(
     path: P,
     data: &[f64],
@@ -111,23 +87,19 @@ pub fn write_parquet<P: AsRef<Path>>(
 
     let col_name = column_name.unwrap_or("data");
 
-    // Create Arrow schema
     let schema = Arc::new(Schema::new(vec![Field::new(
         col_name,
         DataType::Float64,
         false,
     )]));
 
-    // Create Float64Array from slice
     let array = Float64Array::from_iter_values(data.iter().copied());
     let array_ref: ArrayRef = Arc::new(array);
 
-    // Create RecordBatch
     let batch = RecordBatch::try_new(schema.clone(), 
vec![array_ref]).map_err(|e| {
         MahoutError::Io(format!("Failed to create RecordBatch: {}", e))
     })?;
 
-    // Write to Parquet file
     let file = File::create(path.as_ref()).map_err(|e| {
         MahoutError::Io(format!("Failed to create Parquet file: {}", e))
     })?;
@@ -148,18 +120,9 @@ pub fn write_parquet<P: AsRef<Path>>(
     Ok(())
 }
 
-/// Reads quantum data from a Parquet file as Arrow arrays.
-///
-/// Returns Arrow arrays directly from Parquet batches.
-/// Each element in the returned Vec corresponds to one Parquet batch.
-///
-/// Directly constructs the Arrow array from Parquet batches
-///
-/// # Arguments
-/// * `path` - Path to the Parquet file
+/// Reads a Parquet file as Arrow Float64Arrays.
 ///
-/// # Returns
-/// Vector of Float64Arrays, one per Parquet batch
+/// Returns one array per row group for zero-copy access.
 pub fn read_parquet_to_arrow<P: AsRef<Path>>(path: P) -> 
Result<Vec<Float64Array>> {
     let file = File::open(path.as_ref()).map_err(|e| {
         MahoutError::Io(format!("Failed to open Parquet file: {}", e))
@@ -194,7 +157,6 @@ pub fn read_parquet_to_arrow<P: AsRef<Path>>(path: P) -> 
Result<Vec<Float64Array
             )));
         }
 
-        // Clone the Float64Array (reference-counted, no data copy)
         let float_array = column
             .as_any()
             .downcast_ref::<Float64Array>()
@@ -217,12 +179,10 @@ pub fn read_parquet_to_arrow<P: AsRef<Path>>(path: P) -> 
Result<Vec<Float64Array
 
 /// Writes an Arrow Float64Array to a Parquet file.
 ///
-/// Writes from Arrow format to Parquet.
-///
 /// # Arguments
-/// * `path` - Path to write the Parquet file
-/// * `array` - Float64Array to write
-/// * `column_name` - Optional column name (defaults to "data")
+/// * `path` - Output file path
+/// * `array` - Array to write
+/// * `column_name` - Column name (defaults to "data")
 pub fn write_arrow_to_parquet<P: AsRef<Path>>(
     path: P,
     array: &Float64Array,
@@ -236,7 +196,6 @@ pub fn write_arrow_to_parquet<P: AsRef<Path>>(
 
     let col_name = column_name.unwrap_or("data");
 
-    // Create Arrow schema
     let schema = Arc::new(Schema::new(vec![Field::new(
         col_name,
         DataType::Float64,
@@ -244,13 +203,10 @@ pub fn write_arrow_to_parquet<P: AsRef<Path>>(
     )]));
 
     let array_ref: ArrayRef = Arc::new(array.clone());
-
-    // Create RecordBatch
     let batch = RecordBatch::try_new(schema.clone(), 
vec![array_ref]).map_err(|e| {
         MahoutError::Io(format!("Failed to create RecordBatch: {}", e))
     })?;
 
-    // Write to Parquet file
     let file = File::create(path.as_ref()).map_err(|e| {
         MahoutError::Io(format!("Failed to create Parquet file: {}", e))
     })?;
@@ -271,20 +227,15 @@ pub fn write_arrow_to_parquet<P: AsRef<Path>>(
     Ok(())
 }
 
-/// Read batch data from Parquet file with list column format
+/// Reads batch data from a Parquet file with `List<Float64>` column format.
 ///
-/// Efficiently reads Parquet files where each row contains a list of values.
-/// Returns a flattened Vec with all samples concatenated, suitable for batch 
encoding.
-///
-/// # Arguments
-/// * `path` - Path to Parquet file
+/// Returns flattened data suitable for batch encoding.
 ///
 /// # Returns
-/// Tuple of (flattened_data, num_samples, sample_size)
+/// Tuple of `(flattened_data, num_samples, sample_size)`
 ///
-/// # Example
-/// File format: column "feature_vector" with type List<Float64>
-/// Each row = one sample = one list of floats
+/// # TODO
+/// Add OOM protection for very large files
 pub fn read_parquet_batch<P: AsRef<Path>>(path: P) -> Result<(Vec<f64>, usize, 
usize)> {
     let file = File::open(path.as_ref()).map_err(|e| {
         MahoutError::Io(format!("Failed to open Parquet file: {}", e))
@@ -294,6 +245,8 @@ pub fn read_parquet_batch<P: AsRef<Path>>(path: P) -> 
Result<(Vec<f64>, usize, u
         MahoutError::Io(format!("Failed to create Parquet reader: {}", e))
     })?;
 
+    let total_rows = builder.metadata().file_metadata().num_rows() as usize;
+
     let mut reader = builder.build().map_err(|e| {
         MahoutError::Io(format!("Failed to build Parquet reader: {}", e))
     })?;
@@ -313,7 +266,6 @@ pub fn read_parquet_batch<P: AsRef<Path>>(path: P) -> 
Result<(Vec<f64>, usize, u
 
         let column = batch.column(0);
 
-        // Handle List<Float64> column type
         if let DataType::List(_) = column.data_type() {
             let list_array = column
                 .as_any()
@@ -329,7 +281,6 @@ pub fn read_parquet_batch<P: AsRef<Path>>(path: P) -> 
Result<(Vec<f64>, usize, u
 
                 let current_size = float_array.len();
 
-                // Verify all samples have the same size
                 if let Some(expected_size) = sample_size {
                     if current_size != expected_size {
                         return Err(MahoutError::InvalidInput(format!(
@@ -339,10 +290,9 @@ pub fn read_parquet_batch<P: AsRef<Path>>(path: P) -> 
Result<(Vec<f64>, usize, u
                     }
                 } else {
                     sample_size = Some(current_size);
-                    all_data.reserve(current_size * 100); // Reserve space
+                    all_data.reserve(current_size * total_rows);
                 }
 
-                // Efficiently copy the values
                 if float_array.null_count() == 0 {
                     all_data.extend_from_slice(float_array.values());
                 } else {
@@ -365,3 +315,126 @@ pub fn read_parquet_batch<P: AsRef<Path>>(path: P) -> 
Result<(Vec<f64>, usize, u
 
     Ok((all_data, num_samples, sample_size))
 }
+
+/// Reads batch data from an Arrow IPC file.
+///
+/// Supports `FixedSizeList<Float64>` and `List<Float64>` column formats.
+/// Returns flattened data suitable for batch encoding.
+///
+/// # Returns
+/// Tuple of `(flattened_data, num_samples, sample_size)`
+///
+/// # TODO
+/// Add OOM protection for very large files
+pub fn read_arrow_ipc_batch<P: AsRef<Path>>(path: P) -> Result<(Vec<f64>, 
usize, usize)> {
+    let file = File::open(path.as_ref()).map_err(|e| {
+        MahoutError::Io(format!("Failed to open Arrow IPC file: {}", e))
+    })?;
+
+    let reader = ArrowFileReader::try_new(file, None).map_err(|e| {
+        MahoutError::Io(format!("Failed to create Arrow IPC reader: {}", e))
+    })?;
+
+    let mut all_data = Vec::new();
+    let mut num_samples = 0;
+    let mut sample_size: Option<usize> = None;
+
+    for batch_result in reader {
+        let batch = batch_result.map_err(|e| {
+            MahoutError::Io(format!("Failed to read Arrow batch: {}", e))
+        })?;
+
+        if batch.num_columns() == 0 {
+            return Err(MahoutError::Io("Arrow file has no 
columns".to_string()));
+        }
+
+        let column = batch.column(0);
+
+        match column.data_type() {
+            DataType::FixedSizeList(_, size) => {
+                let list_array = column
+                    .as_any()
+                    .downcast_ref::<FixedSizeListArray>()
+                    .ok_or_else(|| MahoutError::Io("Failed to downcast to 
FixedSizeListArray".to_string()))?;
+
+                let current_size = *size as usize;
+
+                if let Some(expected) = sample_size {
+                    if current_size != expected {
+                        return Err(MahoutError::InvalidInput(format!(
+                            "Inconsistent sample sizes: expected {}, got {}",
+                            expected, current_size
+                        )));
+                    }
+                } else {
+                    sample_size = Some(current_size);
+                    all_data.reserve(current_size * batch.num_rows());
+                }
+
+                let values = list_array.values();
+                let float_array = values
+                    .as_any()
+                    .downcast_ref::<Float64Array>()
+                    .ok_or_else(|| MahoutError::Io("Values must be 
Float64".to_string()))?;
+
+                if float_array.null_count() == 0 {
+                    all_data.extend_from_slice(float_array.values());
+                } else {
+                    all_data.extend(float_array.iter().map(|opt| 
opt.unwrap_or(0.0)));
+                }
+
+                num_samples += list_array.len();
+            }
+
+            DataType::List(_) => {
+                let list_array = column
+                    .as_any()
+                    .downcast_ref::<ListArray>()
+                    .ok_or_else(|| MahoutError::Io("Failed to downcast to 
ListArray".to_string()))?;
+
+                for i in 0..list_array.len() {
+                    let value_array = list_array.value(i);
+                    let float_array = value_array
+                        .as_any()
+                        .downcast_ref::<Float64Array>()
+                        .ok_or_else(|| MahoutError::Io("List values must be 
Float64".to_string()))?;
+
+                    let current_size = float_array.len();
+
+                    if let Some(expected) = sample_size {
+                        if current_size != expected {
+                            return Err(MahoutError::InvalidInput(format!(
+                                "Inconsistent sample sizes: expected {}, got 
{}",
+                                expected, current_size
+                            )));
+                        }
+                    } else {
+                        sample_size = Some(current_size);
+                        all_data.reserve(current_size * list_array.len());
+                    }
+
+                    if float_array.null_count() == 0 {
+                        all_data.extend_from_slice(float_array.values());
+                    } else {
+                        all_data.extend(float_array.iter().map(|opt| 
opt.unwrap_or(0.0)));
+                    }
+
+                    num_samples += 1;
+                }
+            }
+
+            _ => {
+                return Err(MahoutError::Io(format!(
+                    "Expected FixedSizeList<Float64> or List<Float64>, got 
{:?}",
+                    column.data_type()
+                )));
+            }
+        }
+    }
+
+    let sample_size = sample_size.ok_or_else(|| {
+        MahoutError::Io("Arrow file contains no data".to_string())
+    })?;
+
+    Ok((all_data, num_samples, sample_size))
+}
diff --git a/qdp/qdp-core/src/lib.rs b/qdp/qdp-core/src/lib.rs
index 65c08bd02..2f8f42092 100644
--- a/qdp/qdp-core/src/lib.rs
+++ b/qdp/qdp-core/src/lib.rs
@@ -154,6 +154,37 @@ impl QdpEngine {
         // Encode using fused batch kernel
         self.encode_batch(&batch_data, num_samples, sample_size, num_qubits, 
encoding_method)
     }
+
+    /// Load data from Arrow IPC file and encode into quantum state
+    ///
+    /// Supports:
+    /// - FixedSizeList<Float64> - fastest, all samples same size
+    /// - List<Float64> - flexible, variable sample sizes
+    ///
+    /// # Arguments
+    /// * `path` - Path to Arrow IPC file (.arrow or .feather)
+    /// * `num_qubits` - Number of qubits
+    /// * `encoding_method` - Strategy: "amplitude", "angle", or "basis"
+    ///
+    /// # Returns
+    /// Single DLPack pointer containing all encoded states (shape: 
[num_samples, 2^num_qubits])
+    pub fn encode_from_arrow_ipc(
+        &self,
+        path: &str,
+        num_qubits: usize,
+        encoding_method: &str,
+    ) -> Result<*mut DLManagedTensor> {
+        crate::profile_scope!("Mahout::EncodeFromArrowIPC");
+
+        // Read Arrow IPC (6x faster than Parquet)
+        let (batch_data, num_samples, sample_size) = {
+            crate::profile_scope!("IO::ReadArrowIPCBatch");
+            crate::io::read_arrow_ipc_batch(path)?
+        };
+
+        // Encode using fused batch kernel
+        self.encode_batch(&batch_data, num_samples, sample_size, num_qubits, 
encoding_method)
+    }
 }
 
 // Re-export key types for convenience
diff --git a/qdp/qdp-core/tests/arrow_ipc_io.rs 
b/qdp/qdp-core/tests/arrow_ipc_io.rs
new file mode 100644
index 000000000..6ef206954
--- /dev/null
+++ b/qdp/qdp-core/tests/arrow_ipc_io.rs
@@ -0,0 +1,239 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use qdp_core::io::{read_arrow_ipc_batch, read_parquet_batch};
+use arrow::array::{Float64Array, FixedSizeListArray};
+use arrow::datatypes::{DataType, Field, Schema};
+use arrow::ipc::writer::FileWriter as ArrowFileWriter;
+use std::fs::{self, File};
+use std::sync::Arc;
+
+mod common;
+
+#[test]
+fn test_read_arrow_ipc_fixed_size_list() {
+    let temp_path = "/tmp/test_arrow_ipc_fixed.arrow";
+    let num_samples = 10;
+    let sample_size = 16;
+
+    // Create test data
+    let mut all_values = Vec::new();
+    for i in 0..num_samples {
+        for j in 0..sample_size {
+            all_values.push((i * sample_size + j) as f64);
+        }
+    }
+
+    // Write Arrow IPC with FixedSizeList format
+    let values_array = Float64Array::from(all_values.clone());
+    let field = Arc::new(Field::new("item", DataType::Float64, false));
+    let list_array = FixedSizeListArray::new(
+        field,
+        sample_size as i32,
+        Arc::new(values_array),
+        None,
+    );
+
+    let schema = Arc::new(Schema::new(vec![Field::new(
+        "data",
+        DataType::FixedSizeList(
+            Arc::new(Field::new("item", DataType::Float64, false)),
+            sample_size as i32,
+        ),
+        false,
+    )]));
+
+    let batch = arrow::record_batch::RecordBatch::try_new(
+        schema.clone(),
+        vec![Arc::new(list_array)],
+    )
+    .unwrap();
+
+    let file = File::create(temp_path).unwrap();
+    let mut writer = ArrowFileWriter::try_new(file, &schema).unwrap();
+    writer.write(&batch).unwrap();
+    writer.finish().unwrap();
+
+    // Read and verify
+    let (data, samples, size) = read_arrow_ipc_batch(temp_path).unwrap();
+
+    assert_eq!(samples, num_samples);
+    assert_eq!(size, sample_size);
+    assert_eq!(data.len(), num_samples * sample_size);
+
+    for (i, &val) in data.iter().enumerate() {
+        assert_eq!(val, i as f64);
+    }
+
+    // Cleanup
+    fs::remove_file(temp_path).unwrap();
+}
+
+#[test]
+fn test_read_arrow_ipc_list() {
+    let temp_path = "/tmp/test_arrow_ipc_list.arrow";
+    let num_samples = 5;
+    let sample_size = 8;
+
+    // Create test data with List format
+    let mut list_builder = 
arrow::array::ListBuilder::new(Float64Array::builder(num_samples * 
sample_size));
+
+    for i in 0..num_samples {
+        let values: Vec<f64> = (0..sample_size).map(|j| (i * sample_size + j) 
as f64).collect();
+        list_builder.values().append_slice(&values);
+        list_builder.append(true);
+    }
+
+    let list_array = list_builder.finish();
+
+    let schema = Arc::new(Schema::new(vec![Field::new(
+        "data",
+        DataType::List(Arc::new(Field::new("item", DataType::Float64, true))),
+        false,
+    )]));
+
+    let batch = arrow::record_batch::RecordBatch::try_new(
+        schema.clone(),
+        vec![Arc::new(list_array)],
+    )
+    .unwrap();
+
+    let file = File::create(temp_path).unwrap();
+    let mut writer = ArrowFileWriter::try_new(file, &schema).unwrap();
+    writer.write(&batch).unwrap();
+    writer.finish().unwrap();
+
+    // Read and verify
+    let (data, samples, size) = read_arrow_ipc_batch(temp_path).unwrap();
+
+    assert_eq!(samples, num_samples);
+    assert_eq!(size, sample_size);
+    assert_eq!(data.len(), num_samples * sample_size);
+
+    for (i, &val) in data.iter().enumerate() {
+        assert_eq!(val, i as f64);
+    }
+
+    // Cleanup
+    fs::remove_file(temp_path).unwrap();
+}
+
+#[test]
+fn test_arrow_ipc_inconsistent_sizes_fails() {
+    let temp_path = "/tmp/test_arrow_ipc_inconsistent.arrow";
+
+    // Create data with inconsistent sample sizes
+    let mut list_builder = 
arrow::array::ListBuilder::new(Float64Array::builder(20));
+
+    // First sample: 4 elements
+    list_builder.values().append_slice(&[1.0, 2.0, 3.0, 4.0]);
+    list_builder.append(true);
+
+    // Second sample: 8 elements (inconsistent!)
+    list_builder.values().append_slice(&[5.0, 6.0, 7.0, 8.0, 9.0, 10.0, 11.0, 
12.0]);
+    list_builder.append(true);
+
+    let list_array = list_builder.finish();
+
+    let schema = Arc::new(Schema::new(vec![Field::new(
+        "data",
+        DataType::List(Arc::new(Field::new("item", DataType::Float64, true))),
+        false,
+    )]));
+
+    let batch = arrow::record_batch::RecordBatch::try_new(
+        schema.clone(),
+        vec![Arc::new(list_array)],
+    )
+    .unwrap();
+
+    let file = File::create(temp_path).unwrap();
+    let mut writer = ArrowFileWriter::try_new(file, &schema).unwrap();
+    writer.write(&batch).unwrap();
+    writer.finish().unwrap();
+
+    // Should fail due to inconsistent sizes
+    let result = read_arrow_ipc_batch(temp_path);
+    assert!(result.is_err());
+
+    // Cleanup
+    fs::remove_file(temp_path).unwrap();
+}
+
+#[test]
+fn test_arrow_ipc_empty_file_fails() {
+    let result = 
read_arrow_ipc_batch("/tmp/nonexistent_arrow_file_12345.arrow");
+    assert!(result.is_err());
+}
+
+#[test]
+fn test_arrow_ipc_large_batch() {
+    let temp_path = "/tmp/test_arrow_ipc_large.arrow";
+    let num_samples = 100;
+    let sample_size = 64;
+
+    // Create large dataset
+    let mut all_values = Vec::with_capacity(num_samples * sample_size);
+    for i in 0..num_samples {
+        for j in 0..sample_size {
+            all_values.push((i * sample_size + j) as f64 / (num_samples * 
sample_size) as f64);
+        }
+    }
+
+    // Write as FixedSizeList
+    let values_array = Float64Array::from(all_values.clone());
+    let field = Arc::new(Field::new("item", DataType::Float64, false));
+    let list_array = FixedSizeListArray::new(
+        field,
+        sample_size as i32,
+        Arc::new(values_array),
+        None,
+    );
+
+    let schema = Arc::new(Schema::new(vec![Field::new(
+        "data",
+        DataType::FixedSizeList(
+            Arc::new(Field::new("item", DataType::Float64, false)),
+            sample_size as i32,
+        ),
+        false,
+    )]));
+
+    let batch = arrow::record_batch::RecordBatch::try_new(
+        schema.clone(),
+        vec![Arc::new(list_array)],
+    )
+    .unwrap();
+
+    let file = File::create(temp_path).unwrap();
+    let mut writer = ArrowFileWriter::try_new(file, &schema).unwrap();
+    writer.write(&batch).unwrap();
+    writer.finish().unwrap();
+
+    // Read and verify
+    let (data, samples, size) = read_arrow_ipc_batch(temp_path).unwrap();
+
+    assert_eq!(samples, num_samples);
+    assert_eq!(size, sample_size);
+    assert_eq!(data.len(), all_values.len());
+
+    for i in 0..data.len() {
+        assert!((data[i] - all_values[i]).abs() < 1e-10);
+    }
+
+    // Cleanup
+    fs::remove_file(temp_path).unwrap();
+}
diff --git a/qdp/qdp-python/src/lib.rs b/qdp/qdp-python/src/lib.rs
index cd3fcc3ac..340aae814 100644
--- a/qdp/qdp-python/src/lib.rs
+++ b/qdp/qdp-python/src/lib.rs
@@ -181,13 +181,7 @@ impl QdpEngine {
         })
     }
 
-    /// Encode from Parquet file (FASTEST - recommended for batches)
-    ///
-    /// Direct Parquet→GPU pipeline:
-    /// - Reads List<Float64> column format using Arrow
-    /// - Zero-copy data extraction
-    /// - Single optimized batch kernel launch
-    /// - Returns batched tensor (shape: [num_samples, 2^num_qubits])
+    /// Encode from Parquet file
     ///
     /// Args:
     ///     path: Path to Parquet file
@@ -209,6 +203,29 @@ impl QdpEngine {
             consumed: false,
         })
     }
+
+    /// Encode from Arrow IPC file
+    ///
+    /// Args:
+    ///     path: Path to Arrow IPC file (.arrow or .feather)
+    ///     num_qubits: Number of qubits for encoding
+    ///     encoding_method: Encoding strategy (currently only "amplitude")
+    ///
+    /// Returns:
+    ///     QuantumTensor: DLPack tensor containing all encoded states
+    ///
+    /// Example:
+    ///     >>> engine = QdpEngine(device_id=0)
+    ///     >>> batched = engine.encode_from_arrow_ipc("data.arrow", 16, 
"amplitude")
+    ///     >>> torch_tensor = torch.from_dlpack(batched)
+    fn encode_from_arrow_ipc(&self, path: &str, num_qubits: usize, 
encoding_method: &str) -> PyResult<QuantumTensor> {
+        let ptr = self.engine.encode_from_arrow_ipc(path, num_qubits, 
encoding_method)
+            .map_err(|e| PyRuntimeError::new_err(format!("Encoding from Arrow 
IPC failed: {}", e)))?;
+        Ok(QuantumTensor {
+            ptr,
+            consumed: false,
+        })
+    }
 }
 
 /// Mahout QDP Python module


Reply via email to