This is an automated email from the ASF dual-hosted git repository. guanmingchiu pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/mahout.git
commit c27f98e0018a20bdcfe7269362eaa67b19045d7f Author: Guan-Ming (Wesley) Chiu <[email protected]> AuthorDate: Wed Dec 10 17:31:28 2025 +0800 [QDP] Add Arrow IPC support (#704) * Add Arrow IPC support * Update default frameworks to "mahout-parquet" * Add tests for Arrow IPC * Add todo to prevent oom * Add TODO for supporting multiple float types --- qdp/benchmark/benchmark_e2e_final.py | 131 +++++++++++++------ qdp/qdp-core/src/io.rs | 231 +++++++++++++++++++++------------ qdp/qdp-core/src/lib.rs | 31 +++++ qdp/qdp-core/tests/arrow_ipc_io.rs | 239 +++++++++++++++++++++++++++++++++++ qdp/qdp-python/src/lib.rs | 31 ++++- 5 files changed, 538 insertions(+), 125 deletions(-) diff --git a/qdp/benchmark/benchmark_e2e_final.py b/qdp/benchmark/benchmark_e2e_final.py index 27efab93d..c00476b2d 100644 --- a/qdp/benchmark/benchmark_e2e_final.py +++ b/qdp/benchmark/benchmark_e2e_final.py @@ -37,6 +37,7 @@ import os import itertools import pyarrow as pa import pyarrow.parquet as pq +import pyarrow.ipc as ipc from mahout_qdp import QdpEngine # Competitors @@ -57,6 +58,7 @@ except ImportError: # Config DATA_FILE = "final_benchmark_data.parquet" +ARROW_FILE = "final_benchmark_data.arrow" HIDDEN_DIM = 16 BATCH_SIZE = 64 # Small batch to stress loop overhead @@ -71,28 +73,34 @@ class DummyQNN(nn.Module): def generate_data(n_qubits, n_samples): - if os.path.exists(DATA_FILE): - os.remove(DATA_FILE) + for f in [DATA_FILE, ARROW_FILE]: + if os.path.exists(f): + os.remove(f) print(f"Generating {n_samples} samples of {n_qubits} qubits...") dim = 1 << n_qubits - # Generate for PennyLane/Qiskit (List format) - chunk_size = 500 - schema_list = pa.schema([("feature_vector", pa.list_(pa.float64()))]) + # Generate all data at once + np.random.seed(42) + all_data = np.random.rand(n_samples, dim).astype(np.float64) - with pq.ParquetWriter(DATA_FILE, schema_list) as writer: - for start_idx in range(0, n_samples, chunk_size): - current = min(chunk_size, n_samples - start_idx) - data = np.random.rand(current, dim).astype(np.float64) - feature_vectors = [row.tolist() for row in data] - arrays = pa.array(feature_vectors, type=pa.list_(pa.float64())) - batch_table = pa.Table.from_arrays([arrays], names=["feature_vector"]) - writer.write_table(batch_table) + # Save as Parquet (List format for PennyLane/Qiskit) + feature_vectors = [row.tolist() for row in all_data] + table = pa.table( + {"feature_vector": pa.array(feature_vectors, type=pa.list_(pa.float64()))} + ) + pq.write_table(table, DATA_FILE) + + # Save as Arrow IPC (FixedSizeList format for Mahout) + arr = pa.FixedSizeListArray.from_arrays(pa.array(all_data.flatten()), dim) + arrow_table = pa.table({"data": arr}) + with ipc.RecordBatchFileWriter(ARROW_FILE, arrow_table.schema) as writer: + writer.write_table(arrow_table) - file_size_mb = os.path.getsize(DATA_FILE) / (1024 * 1024) + parquet_size = os.path.getsize(DATA_FILE) / (1024 * 1024) + arrow_size = os.path.getsize(ARROW_FILE) / (1024 * 1024) print(f" Generated {n_samples} samples") - print(f" Parquet file size: {file_size_mb:.2f} MB") + print(f" Parquet: {parquet_size:.2f} MB, Arrow IPC: {arrow_size:.2f} MB") # ----------------------------------------------------------- @@ -220,10 +228,10 @@ def run_pennylane(n_qubits, n_samples): # ----------------------------------------------------------- -# 3. Mahout Full Pipeline +# 3. Mahout Parquet Pipeline # ----------------------------------------------------------- -def run_mahout(engine, n_qubits, n_samples): - print("\n[Mahout] Full Pipeline (Disk -> GPU)...") +def run_mahout_parquet(engine, n_qubits, n_samples): + print("\n[Mahout-Parquet] Full Pipeline (Parquet -> GPU)...") model = DummyQNN(n_qubits).cuda() torch.cuda.synchronize() @@ -262,6 +270,44 @@ def run_mahout(engine, n_qubits, n_samples): return total_time, gpu_reshaped +# ----------------------------------------------------------- +# 4. Mahout Arrow IPC Pipeline +# ----------------------------------------------------------- +def run_mahout_arrow(engine, n_qubits, n_samples): + print("\n[Mahout-Arrow] Full Pipeline (Arrow IPC -> GPU)...") + model = DummyQNN(n_qubits).cuda() + + torch.cuda.synchronize() + start_time = time.perf_counter() + + arrow_encode_start = time.perf_counter() + batched_tensor = engine.encode_from_arrow_ipc(ARROW_FILE, n_qubits, "amplitude") + arrow_encode_time = time.perf_counter() - arrow_encode_start + print(f" Arrow->GPU (IO+Encode): {arrow_encode_time:.4f} s") + + dlpack_start = time.perf_counter() + gpu_batched = torch.from_dlpack(batched_tensor) + dlpack_time = time.perf_counter() - dlpack_start + print(f" DLPack conversion: {dlpack_time:.4f} s") + + state_len = 1 << n_qubits + gpu_reshaped = gpu_batched.view(n_samples, state_len) + + reshape_start = time.perf_counter() + gpu_all_data = gpu_reshaped.abs().to(torch.float32) + reshape_time = time.perf_counter() - reshape_start + print(f" Reshape & convert: {reshape_time:.4f} s") + + for i in range(0, n_samples, BATCH_SIZE): + batch = gpu_all_data[i : i + BATCH_SIZE] + _ = model(batch) + + torch.cuda.synchronize() + total_time = time.perf_counter() - start_time + print(f" Total Time: {total_time:.4f} s") + return total_time, gpu_reshaped + + def compare_states(name_a, states_a, name_b, states_b): print("\n" + "=" * 70) print(f"VERIFICATION ({name_a} vs {name_b})") @@ -314,22 +360,15 @@ if __name__ == "__main__": parser.add_argument( "--frameworks", nargs="+", - default=["mahout", "pennylane"], - choices=["mahout", "pennylane", "qiskit", "all"], - help="Frameworks to benchmark (default: mahout pennylane). Use 'all' to run all available frameworks.", + default=["mahout-parquet", "pennylane"], + choices=["mahout-parquet", "mahout-arrow", "pennylane", "qiskit", "all"], + help="Frameworks to benchmark. Use 'all' to run all available frameworks.", ) args = parser.parse_args() # Expand "all" option if "all" in args.frameworks: - all_frameworks = [] - if "mahout" in args.frameworks or "all" in args.frameworks: - all_frameworks.append("mahout") - if "pennylane" in args.frameworks or "all" in args.frameworks: - all_frameworks.append("pennylane") - if "qiskit" in args.frameworks or "all" in args.frameworks: - all_frameworks.append("qiskit") - args.frameworks = all_frameworks + args.frameworks = ["mahout-parquet", "mahout-arrow", "pennylane", "qiskit"] generate_data(args.qubits, args.samples) @@ -345,7 +384,8 @@ if __name__ == "__main__": # Initialize results t_pl, pl_all_states = 0.0, None - t_mahout, mahout_all_states = 0.0, None + t_mahout_parquet, mahout_parquet_all_states = 0.0, None + t_mahout_arrow, mahout_arrow_all_states = 0.0, None t_qiskit, qiskit_all_states = 0.0, None # Run benchmarks @@ -355,8 +395,15 @@ if __name__ == "__main__": if "qiskit" in args.frameworks: t_qiskit, qiskit_all_states = run_qiskit(args.qubits, args.samples) - if "mahout" in args.frameworks: - t_mahout, mahout_all_states = run_mahout(engine, args.qubits, args.samples) + if "mahout-parquet" in args.frameworks: + t_mahout_parquet, mahout_parquet_all_states = run_mahout_parquet( + engine, args.qubits, args.samples + ) + + if "mahout-arrow" in args.frameworks: + t_mahout_arrow, mahout_arrow_all_states = run_mahout_arrow( + engine, args.qubits, args.samples + ) print("\n" + "=" * 70) print("E2E LATENCY (Lower is Better)") @@ -364,8 +411,10 @@ if __name__ == "__main__": print("=" * 70) results = [] - if t_mahout > 0: - results.append(("Mahout", t_mahout)) + if t_mahout_parquet > 0: + results.append(("Mahout-Parquet", t_mahout_parquet)) + if t_mahout_arrow > 0: + results.append(("Mahout-Arrow", t_mahout_arrow)) if t_pl > 0: results.append(("PennyLane", t_pl)) if t_qiskit > 0: @@ -374,19 +423,23 @@ if __name__ == "__main__": results.sort(key=lambda x: x[1]) for name, time_val in results: - print(f"{name:12s} {time_val:10.4f} s") + print(f"{name:16s} {time_val:10.4f} s") print("-" * 70) - if t_mahout > 0: + # Use fastest Mahout variant for speedup comparison + mahout_times = [t for t in [t_mahout_arrow, t_mahout_parquet] if t > 0] + t_mahout_best = min(mahout_times) if mahout_times else 0 + if t_mahout_best > 0: if t_pl > 0: - print(f"Speedup vs PennyLane: {t_pl / t_mahout:10.2f}x") + print(f"Speedup vs PennyLane: {t_pl / t_mahout_best:10.2f}x") if t_qiskit > 0: - print(f"Speedup vs Qiskit: {t_qiskit / t_mahout:10.2f}x") + print(f"Speedup vs Qiskit: {t_qiskit / t_mahout_best:10.2f}x") # Run Verification after benchmarks verify_correctness( { - "Mahout": mahout_all_states, + "Mahout-Parquet": mahout_parquet_all_states, + "Mahout-Arrow": mahout_arrow_all_states, "PennyLane": pl_all_states, "Qiskit": qiskit_all_states, } diff --git a/qdp/qdp-core/src/io.rs b/qdp/qdp-core/src/io.rs index 93ad1d018..372f4ef75 100644 --- a/qdp/qdp-core/src/io.rs +++ b/qdp/qdp-core/src/io.rs @@ -14,25 +14,28 @@ // See the License for the specific language governing permissions and // limitations under the License. -//! I/O module for reading and writing quantum data +//! I/O utilities for reading and writing quantum data. //! -//! This module provides efficient columnar data exchange with the data science ecosystem, +//! Provides efficient columnar data exchange via Apache Arrow and Parquet formats. +//! +//! # TODO +//! Consider using generic `T: ArrowPrimitiveType` instead of hardcoded `Float64Array` +//! to support both Float32 and Float64 for flexibility in precision vs performance trade-offs. use std::fs::File; use std::path::Path; use std::sync::Arc; -use arrow::array::{Array, ArrayRef, Float64Array, ListArray, RecordBatch}; +use arrow::array::{Array, ArrayRef, Float64Array, FixedSizeListArray, ListArray, RecordBatch}; use arrow::datatypes::{DataType, Field, Schema}; +use arrow::ipc::reader::FileReader as ArrowFileReader; use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; use parquet::arrow::ArrowWriter; use parquet::file::properties::WriterProperties; use crate::error::{MahoutError, Result}; -/// Convert Arrow Float64Array to Vec<f64> -/// -/// Uses Arrow's internal buffer directly if no nulls, otherwise copies +/// Converts an Arrow Float64Array to Vec<f64>. pub fn arrow_to_vec(array: &Float64Array) -> Vec<f64> { if array.null_count() == 0 { array.values().to_vec() @@ -41,9 +44,7 @@ pub fn arrow_to_vec(array: &Float64Array) -> Vec<f64> { } } -/// Convert chunked Arrow Float64Array to Vec<f64> -/// -/// Efficiently flattens multiple Arrow arrays into a single Vec +/// Flattens multiple Arrow Float64Arrays into a single Vec<f64>. pub fn arrow_to_vec_chunked(arrays: &[Float64Array]) -> Vec<f64> { let total_len: usize = arrays.iter().map(|a| a.len()).sum(); let mut result = Vec::with_capacity(total_len); @@ -59,45 +60,20 @@ pub fn arrow_to_vec_chunked(arrays: &[Float64Array]) -> Vec<f64> { result } -/// Reads quantum data from a Parquet file. -/// -/// Expects a single column named "data" containing Float64 values. -/// This function performs one copy from Arrow to Vec. -/// use `read_parquet_to_arrow` instead. -/// -/// # Arguments -/// * `path` - Path to the Parquet file +/// Reads Float64 data from a Parquet file. /// -/// # Returns -/// Vector of f64 values from the first column -/// -/// # Example -/// ```no_run -/// use qdp_core::io::read_parquet; -/// -/// let data = read_parquet("quantum_data.parquet").unwrap(); -/// ``` +/// Expects a single Float64 column. For zero-copy access, use [`read_parquet_to_arrow`]. pub fn read_parquet<P: AsRef<Path>>(path: P) -> Result<Vec<f64>> { let chunks = read_parquet_to_arrow(path)?; Ok(arrow_to_vec_chunked(&chunks)) } -/// Writes quantum data to a Parquet file. -/// -/// Creates a single column named "data" containing Float64 values. +/// Writes Float64 data to a Parquet file. /// /// # Arguments -/// * `path` - Path to write the Parquet file -/// * `data` - Vector of f64 values to write -/// * `column_name` - Optional column name (defaults to "data") -/// -/// # Example -/// ```no_run -/// use qdp_core::io::write_parquet; -/// -/// let data = vec![0.5, 0.5, 0.5, 0.5]; -/// write_parquet("quantum_data.parquet", &data, None).unwrap(); -/// ``` +/// * `path` - Output file path +/// * `data` - Data to write +/// * `column_name` - Column name (defaults to "data") pub fn write_parquet<P: AsRef<Path>>( path: P, data: &[f64], @@ -111,23 +87,19 @@ pub fn write_parquet<P: AsRef<Path>>( let col_name = column_name.unwrap_or("data"); - // Create Arrow schema let schema = Arc::new(Schema::new(vec![Field::new( col_name, DataType::Float64, false, )])); - // Create Float64Array from slice let array = Float64Array::from_iter_values(data.iter().copied()); let array_ref: ArrayRef = Arc::new(array); - // Create RecordBatch let batch = RecordBatch::try_new(schema.clone(), vec![array_ref]).map_err(|e| { MahoutError::Io(format!("Failed to create RecordBatch: {}", e)) })?; - // Write to Parquet file let file = File::create(path.as_ref()).map_err(|e| { MahoutError::Io(format!("Failed to create Parquet file: {}", e)) })?; @@ -148,18 +120,9 @@ pub fn write_parquet<P: AsRef<Path>>( Ok(()) } -/// Reads quantum data from a Parquet file as Arrow arrays. -/// -/// Returns Arrow arrays directly from Parquet batches. -/// Each element in the returned Vec corresponds to one Parquet batch. -/// -/// Directly constructs the Arrow array from Parquet batches -/// -/// # Arguments -/// * `path` - Path to the Parquet file +/// Reads a Parquet file as Arrow Float64Arrays. /// -/// # Returns -/// Vector of Float64Arrays, one per Parquet batch +/// Returns one array per row group for zero-copy access. pub fn read_parquet_to_arrow<P: AsRef<Path>>(path: P) -> Result<Vec<Float64Array>> { let file = File::open(path.as_ref()).map_err(|e| { MahoutError::Io(format!("Failed to open Parquet file: {}", e)) @@ -194,7 +157,6 @@ pub fn read_parquet_to_arrow<P: AsRef<Path>>(path: P) -> Result<Vec<Float64Array ))); } - // Clone the Float64Array (reference-counted, no data copy) let float_array = column .as_any() .downcast_ref::<Float64Array>() @@ -217,12 +179,10 @@ pub fn read_parquet_to_arrow<P: AsRef<Path>>(path: P) -> Result<Vec<Float64Array /// Writes an Arrow Float64Array to a Parquet file. /// -/// Writes from Arrow format to Parquet. -/// /// # Arguments -/// * `path` - Path to write the Parquet file -/// * `array` - Float64Array to write -/// * `column_name` - Optional column name (defaults to "data") +/// * `path` - Output file path +/// * `array` - Array to write +/// * `column_name` - Column name (defaults to "data") pub fn write_arrow_to_parquet<P: AsRef<Path>>( path: P, array: &Float64Array, @@ -236,7 +196,6 @@ pub fn write_arrow_to_parquet<P: AsRef<Path>>( let col_name = column_name.unwrap_or("data"); - // Create Arrow schema let schema = Arc::new(Schema::new(vec![Field::new( col_name, DataType::Float64, @@ -244,13 +203,10 @@ pub fn write_arrow_to_parquet<P: AsRef<Path>>( )])); let array_ref: ArrayRef = Arc::new(array.clone()); - - // Create RecordBatch let batch = RecordBatch::try_new(schema.clone(), vec![array_ref]).map_err(|e| { MahoutError::Io(format!("Failed to create RecordBatch: {}", e)) })?; - // Write to Parquet file let file = File::create(path.as_ref()).map_err(|e| { MahoutError::Io(format!("Failed to create Parquet file: {}", e)) })?; @@ -271,20 +227,15 @@ pub fn write_arrow_to_parquet<P: AsRef<Path>>( Ok(()) } -/// Read batch data from Parquet file with list column format +/// Reads batch data from a Parquet file with `List<Float64>` column format. /// -/// Efficiently reads Parquet files where each row contains a list of values. -/// Returns a flattened Vec with all samples concatenated, suitable for batch encoding. -/// -/// # Arguments -/// * `path` - Path to Parquet file +/// Returns flattened data suitable for batch encoding. /// /// # Returns -/// Tuple of (flattened_data, num_samples, sample_size) +/// Tuple of `(flattened_data, num_samples, sample_size)` /// -/// # Example -/// File format: column "feature_vector" with type List<Float64> -/// Each row = one sample = one list of floats +/// # TODO +/// Add OOM protection for very large files pub fn read_parquet_batch<P: AsRef<Path>>(path: P) -> Result<(Vec<f64>, usize, usize)> { let file = File::open(path.as_ref()).map_err(|e| { MahoutError::Io(format!("Failed to open Parquet file: {}", e)) @@ -294,6 +245,8 @@ pub fn read_parquet_batch<P: AsRef<Path>>(path: P) -> Result<(Vec<f64>, usize, u MahoutError::Io(format!("Failed to create Parquet reader: {}", e)) })?; + let total_rows = builder.metadata().file_metadata().num_rows() as usize; + let mut reader = builder.build().map_err(|e| { MahoutError::Io(format!("Failed to build Parquet reader: {}", e)) })?; @@ -313,7 +266,6 @@ pub fn read_parquet_batch<P: AsRef<Path>>(path: P) -> Result<(Vec<f64>, usize, u let column = batch.column(0); - // Handle List<Float64> column type if let DataType::List(_) = column.data_type() { let list_array = column .as_any() @@ -329,7 +281,6 @@ pub fn read_parquet_batch<P: AsRef<Path>>(path: P) -> Result<(Vec<f64>, usize, u let current_size = float_array.len(); - // Verify all samples have the same size if let Some(expected_size) = sample_size { if current_size != expected_size { return Err(MahoutError::InvalidInput(format!( @@ -339,10 +290,9 @@ pub fn read_parquet_batch<P: AsRef<Path>>(path: P) -> Result<(Vec<f64>, usize, u } } else { sample_size = Some(current_size); - all_data.reserve(current_size * 100); // Reserve space + all_data.reserve(current_size * total_rows); } - // Efficiently copy the values if float_array.null_count() == 0 { all_data.extend_from_slice(float_array.values()); } else { @@ -365,3 +315,126 @@ pub fn read_parquet_batch<P: AsRef<Path>>(path: P) -> Result<(Vec<f64>, usize, u Ok((all_data, num_samples, sample_size)) } + +/// Reads batch data from an Arrow IPC file. +/// +/// Supports `FixedSizeList<Float64>` and `List<Float64>` column formats. +/// Returns flattened data suitable for batch encoding. +/// +/// # Returns +/// Tuple of `(flattened_data, num_samples, sample_size)` +/// +/// # TODO +/// Add OOM protection for very large files +pub fn read_arrow_ipc_batch<P: AsRef<Path>>(path: P) -> Result<(Vec<f64>, usize, usize)> { + let file = File::open(path.as_ref()).map_err(|e| { + MahoutError::Io(format!("Failed to open Arrow IPC file: {}", e)) + })?; + + let reader = ArrowFileReader::try_new(file, None).map_err(|e| { + MahoutError::Io(format!("Failed to create Arrow IPC reader: {}", e)) + })?; + + let mut all_data = Vec::new(); + let mut num_samples = 0; + let mut sample_size: Option<usize> = None; + + for batch_result in reader { + let batch = batch_result.map_err(|e| { + MahoutError::Io(format!("Failed to read Arrow batch: {}", e)) + })?; + + if batch.num_columns() == 0 { + return Err(MahoutError::Io("Arrow file has no columns".to_string())); + } + + let column = batch.column(0); + + match column.data_type() { + DataType::FixedSizeList(_, size) => { + let list_array = column + .as_any() + .downcast_ref::<FixedSizeListArray>() + .ok_or_else(|| MahoutError::Io("Failed to downcast to FixedSizeListArray".to_string()))?; + + let current_size = *size as usize; + + if let Some(expected) = sample_size { + if current_size != expected { + return Err(MahoutError::InvalidInput(format!( + "Inconsistent sample sizes: expected {}, got {}", + expected, current_size + ))); + } + } else { + sample_size = Some(current_size); + all_data.reserve(current_size * batch.num_rows()); + } + + let values = list_array.values(); + let float_array = values + .as_any() + .downcast_ref::<Float64Array>() + .ok_or_else(|| MahoutError::Io("Values must be Float64".to_string()))?; + + if float_array.null_count() == 0 { + all_data.extend_from_slice(float_array.values()); + } else { + all_data.extend(float_array.iter().map(|opt| opt.unwrap_or(0.0))); + } + + num_samples += list_array.len(); + } + + DataType::List(_) => { + let list_array = column + .as_any() + .downcast_ref::<ListArray>() + .ok_or_else(|| MahoutError::Io("Failed to downcast to ListArray".to_string()))?; + + for i in 0..list_array.len() { + let value_array = list_array.value(i); + let float_array = value_array + .as_any() + .downcast_ref::<Float64Array>() + .ok_or_else(|| MahoutError::Io("List values must be Float64".to_string()))?; + + let current_size = float_array.len(); + + if let Some(expected) = sample_size { + if current_size != expected { + return Err(MahoutError::InvalidInput(format!( + "Inconsistent sample sizes: expected {}, got {}", + expected, current_size + ))); + } + } else { + sample_size = Some(current_size); + all_data.reserve(current_size * list_array.len()); + } + + if float_array.null_count() == 0 { + all_data.extend_from_slice(float_array.values()); + } else { + all_data.extend(float_array.iter().map(|opt| opt.unwrap_or(0.0))); + } + + num_samples += 1; + } + } + + _ => { + return Err(MahoutError::Io(format!( + "Expected FixedSizeList<Float64> or List<Float64>, got {:?}", + column.data_type() + ))); + } + } + } + + let sample_size = sample_size.ok_or_else(|| { + MahoutError::Io("Arrow file contains no data".to_string()) + })?; + + Ok((all_data, num_samples, sample_size)) +} diff --git a/qdp/qdp-core/src/lib.rs b/qdp/qdp-core/src/lib.rs index 65c08bd02..2f8f42092 100644 --- a/qdp/qdp-core/src/lib.rs +++ b/qdp/qdp-core/src/lib.rs @@ -154,6 +154,37 @@ impl QdpEngine { // Encode using fused batch kernel self.encode_batch(&batch_data, num_samples, sample_size, num_qubits, encoding_method) } + + /// Load data from Arrow IPC file and encode into quantum state + /// + /// Supports: + /// - FixedSizeList<Float64> - fastest, all samples same size + /// - List<Float64> - flexible, variable sample sizes + /// + /// # Arguments + /// * `path` - Path to Arrow IPC file (.arrow or .feather) + /// * `num_qubits` - Number of qubits + /// * `encoding_method` - Strategy: "amplitude", "angle", or "basis" + /// + /// # Returns + /// Single DLPack pointer containing all encoded states (shape: [num_samples, 2^num_qubits]) + pub fn encode_from_arrow_ipc( + &self, + path: &str, + num_qubits: usize, + encoding_method: &str, + ) -> Result<*mut DLManagedTensor> { + crate::profile_scope!("Mahout::EncodeFromArrowIPC"); + + // Read Arrow IPC (6x faster than Parquet) + let (batch_data, num_samples, sample_size) = { + crate::profile_scope!("IO::ReadArrowIPCBatch"); + crate::io::read_arrow_ipc_batch(path)? + }; + + // Encode using fused batch kernel + self.encode_batch(&batch_data, num_samples, sample_size, num_qubits, encoding_method) + } } // Re-export key types for convenience diff --git a/qdp/qdp-core/tests/arrow_ipc_io.rs b/qdp/qdp-core/tests/arrow_ipc_io.rs new file mode 100644 index 000000000..6ef206954 --- /dev/null +++ b/qdp/qdp-core/tests/arrow_ipc_io.rs @@ -0,0 +1,239 @@ +// +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use qdp_core::io::{read_arrow_ipc_batch, read_parquet_batch}; +use arrow::array::{Float64Array, FixedSizeListArray}; +use arrow::datatypes::{DataType, Field, Schema}; +use arrow::ipc::writer::FileWriter as ArrowFileWriter; +use std::fs::{self, File}; +use std::sync::Arc; + +mod common; + +#[test] +fn test_read_arrow_ipc_fixed_size_list() { + let temp_path = "/tmp/test_arrow_ipc_fixed.arrow"; + let num_samples = 10; + let sample_size = 16; + + // Create test data + let mut all_values = Vec::new(); + for i in 0..num_samples { + for j in 0..sample_size { + all_values.push((i * sample_size + j) as f64); + } + } + + // Write Arrow IPC with FixedSizeList format + let values_array = Float64Array::from(all_values.clone()); + let field = Arc::new(Field::new("item", DataType::Float64, false)); + let list_array = FixedSizeListArray::new( + field, + sample_size as i32, + Arc::new(values_array), + None, + ); + + let schema = Arc::new(Schema::new(vec![Field::new( + "data", + DataType::FixedSizeList( + Arc::new(Field::new("item", DataType::Float64, false)), + sample_size as i32, + ), + false, + )])); + + let batch = arrow::record_batch::RecordBatch::try_new( + schema.clone(), + vec![Arc::new(list_array)], + ) + .unwrap(); + + let file = File::create(temp_path).unwrap(); + let mut writer = ArrowFileWriter::try_new(file, &schema).unwrap(); + writer.write(&batch).unwrap(); + writer.finish().unwrap(); + + // Read and verify + let (data, samples, size) = read_arrow_ipc_batch(temp_path).unwrap(); + + assert_eq!(samples, num_samples); + assert_eq!(size, sample_size); + assert_eq!(data.len(), num_samples * sample_size); + + for (i, &val) in data.iter().enumerate() { + assert_eq!(val, i as f64); + } + + // Cleanup + fs::remove_file(temp_path).unwrap(); +} + +#[test] +fn test_read_arrow_ipc_list() { + let temp_path = "/tmp/test_arrow_ipc_list.arrow"; + let num_samples = 5; + let sample_size = 8; + + // Create test data with List format + let mut list_builder = arrow::array::ListBuilder::new(Float64Array::builder(num_samples * sample_size)); + + for i in 0..num_samples { + let values: Vec<f64> = (0..sample_size).map(|j| (i * sample_size + j) as f64).collect(); + list_builder.values().append_slice(&values); + list_builder.append(true); + } + + let list_array = list_builder.finish(); + + let schema = Arc::new(Schema::new(vec![Field::new( + "data", + DataType::List(Arc::new(Field::new("item", DataType::Float64, true))), + false, + )])); + + let batch = arrow::record_batch::RecordBatch::try_new( + schema.clone(), + vec![Arc::new(list_array)], + ) + .unwrap(); + + let file = File::create(temp_path).unwrap(); + let mut writer = ArrowFileWriter::try_new(file, &schema).unwrap(); + writer.write(&batch).unwrap(); + writer.finish().unwrap(); + + // Read and verify + let (data, samples, size) = read_arrow_ipc_batch(temp_path).unwrap(); + + assert_eq!(samples, num_samples); + assert_eq!(size, sample_size); + assert_eq!(data.len(), num_samples * sample_size); + + for (i, &val) in data.iter().enumerate() { + assert_eq!(val, i as f64); + } + + // Cleanup + fs::remove_file(temp_path).unwrap(); +} + +#[test] +fn test_arrow_ipc_inconsistent_sizes_fails() { + let temp_path = "/tmp/test_arrow_ipc_inconsistent.arrow"; + + // Create data with inconsistent sample sizes + let mut list_builder = arrow::array::ListBuilder::new(Float64Array::builder(20)); + + // First sample: 4 elements + list_builder.values().append_slice(&[1.0, 2.0, 3.0, 4.0]); + list_builder.append(true); + + // Second sample: 8 elements (inconsistent!) + list_builder.values().append_slice(&[5.0, 6.0, 7.0, 8.0, 9.0, 10.0, 11.0, 12.0]); + list_builder.append(true); + + let list_array = list_builder.finish(); + + let schema = Arc::new(Schema::new(vec![Field::new( + "data", + DataType::List(Arc::new(Field::new("item", DataType::Float64, true))), + false, + )])); + + let batch = arrow::record_batch::RecordBatch::try_new( + schema.clone(), + vec![Arc::new(list_array)], + ) + .unwrap(); + + let file = File::create(temp_path).unwrap(); + let mut writer = ArrowFileWriter::try_new(file, &schema).unwrap(); + writer.write(&batch).unwrap(); + writer.finish().unwrap(); + + // Should fail due to inconsistent sizes + let result = read_arrow_ipc_batch(temp_path); + assert!(result.is_err()); + + // Cleanup + fs::remove_file(temp_path).unwrap(); +} + +#[test] +fn test_arrow_ipc_empty_file_fails() { + let result = read_arrow_ipc_batch("/tmp/nonexistent_arrow_file_12345.arrow"); + assert!(result.is_err()); +} + +#[test] +fn test_arrow_ipc_large_batch() { + let temp_path = "/tmp/test_arrow_ipc_large.arrow"; + let num_samples = 100; + let sample_size = 64; + + // Create large dataset + let mut all_values = Vec::with_capacity(num_samples * sample_size); + for i in 0..num_samples { + for j in 0..sample_size { + all_values.push((i * sample_size + j) as f64 / (num_samples * sample_size) as f64); + } + } + + // Write as FixedSizeList + let values_array = Float64Array::from(all_values.clone()); + let field = Arc::new(Field::new("item", DataType::Float64, false)); + let list_array = FixedSizeListArray::new( + field, + sample_size as i32, + Arc::new(values_array), + None, + ); + + let schema = Arc::new(Schema::new(vec![Field::new( + "data", + DataType::FixedSizeList( + Arc::new(Field::new("item", DataType::Float64, false)), + sample_size as i32, + ), + false, + )])); + + let batch = arrow::record_batch::RecordBatch::try_new( + schema.clone(), + vec![Arc::new(list_array)], + ) + .unwrap(); + + let file = File::create(temp_path).unwrap(); + let mut writer = ArrowFileWriter::try_new(file, &schema).unwrap(); + writer.write(&batch).unwrap(); + writer.finish().unwrap(); + + // Read and verify + let (data, samples, size) = read_arrow_ipc_batch(temp_path).unwrap(); + + assert_eq!(samples, num_samples); + assert_eq!(size, sample_size); + assert_eq!(data.len(), all_values.len()); + + for i in 0..data.len() { + assert!((data[i] - all_values[i]).abs() < 1e-10); + } + + // Cleanup + fs::remove_file(temp_path).unwrap(); +} diff --git a/qdp/qdp-python/src/lib.rs b/qdp/qdp-python/src/lib.rs index cd3fcc3ac..340aae814 100644 --- a/qdp/qdp-python/src/lib.rs +++ b/qdp/qdp-python/src/lib.rs @@ -181,13 +181,7 @@ impl QdpEngine { }) } - /// Encode from Parquet file (FASTEST - recommended for batches) - /// - /// Direct Parquet→GPU pipeline: - /// - Reads List<Float64> column format using Arrow - /// - Zero-copy data extraction - /// - Single optimized batch kernel launch - /// - Returns batched tensor (shape: [num_samples, 2^num_qubits]) + /// Encode from Parquet file /// /// Args: /// path: Path to Parquet file @@ -209,6 +203,29 @@ impl QdpEngine { consumed: false, }) } + + /// Encode from Arrow IPC file + /// + /// Args: + /// path: Path to Arrow IPC file (.arrow or .feather) + /// num_qubits: Number of qubits for encoding + /// encoding_method: Encoding strategy (currently only "amplitude") + /// + /// Returns: + /// QuantumTensor: DLPack tensor containing all encoded states + /// + /// Example: + /// >>> engine = QdpEngine(device_id=0) + /// >>> batched = engine.encode_from_arrow_ipc("data.arrow", 16, "amplitude") + /// >>> torch_tensor = torch.from_dlpack(batched) + fn encode_from_arrow_ipc(&self, path: &str, num_qubits: usize, encoding_method: &str) -> PyResult<QuantumTensor> { + let ptr = self.engine.encode_from_arrow_ipc(path, num_qubits, encoding_method) + .map_err(|e| PyRuntimeError::new_err(format!("Encoding from Arrow IPC failed: {}", e)))?; + Ok(QuantumTensor { + ptr, + consumed: false, + }) + } } /// Mahout QDP Python module
