This is an automated email from the ASF dual-hosted git repository.

guanmingchiu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/mahout.git


The following commit(s) were added to refs/heads/main by this push:
     new be4da0203 [QDP] Add Parquet streaming support for angle encoding (#898)
be4da0203 is described below

commit be4da020349aab0ad823ceb4d4a3a1708c946ca0
Author: Jie-Kai Chang <[email protected]>
AuthorDate: Thu Jan 22 13:40:51 2026 +0800

    [QDP] Add Parquet streaming support for angle encoding (#898)
    
    * streaming angle encoder
    
    Signed-off-by: 400Ping <[email protected]>
    
    * fix pre-commit
    
    Signed-off-by: 400Ping <[email protected]>
    
    ---------
    
    Signed-off-by: 400Ping <[email protected]>
---
 qdp/qdp-core/src/encoding/angle.rs | 125 +++++++++++++++++++++++++++++++++++++
 qdp/qdp-core/src/encoding/mod.rs   |   5 ++
 qdp/qdp-core/src/lib.rs            |   4 +-
 3 files changed, 132 insertions(+), 2 deletions(-)

diff --git a/qdp/qdp-core/src/encoding/angle.rs 
b/qdp/qdp-core/src/encoding/angle.rs
new file mode 100644
index 000000000..a9f5e5bc8
--- /dev/null
+++ b/qdp/qdp-core/src/encoding/angle.rs
@@ -0,0 +1,125 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//! Angle encoding implementation.
+
+use std::ffi::c_void;
+
+use qdp_kernels::launch_angle_encode_batch;
+
+use super::{ChunkEncoder, STAGE_SIZE_ELEMENTS};
+use crate::gpu::PipelineContext;
+use crate::gpu::memory::PinnedHostBuffer;
+use crate::{MahoutError, QdpEngine, Result};
+
+/// Angle encoder state (no persistent buffers required).
+pub(crate) struct AngleEncoderState;
+
+/// Angle encoding: maps per-qubit angles to product state amplitudes.
+pub(crate) struct AngleEncoder;
+
+impl ChunkEncoder for AngleEncoder {
+    type State = AngleEncoderState;
+
+    fn validate_sample_size(&self, sample_size: usize) -> Result<()> {
+        if sample_size == 0 {
+            return Err(MahoutError::InvalidInput(
+                "Angle encoding requires sample_size > 0".into(),
+            ));
+        }
+        if sample_size > STAGE_SIZE_ELEMENTS {
+            return Err(MahoutError::InvalidInput(format!(
+                "Sample size {} exceeds staging buffer capacity {}",
+                sample_size, STAGE_SIZE_ELEMENTS
+            )));
+        }
+        Ok(())
+    }
+
+    fn init_state(
+        &self,
+        _engine: &QdpEngine,
+        sample_size: usize,
+        num_qubits: usize,
+    ) -> Result<Self::State> {
+        if num_qubits == 0 || num_qubits > 30 {
+            return Err(MahoutError::InvalidInput(format!(
+                "Number of qubits {} must be between 1 and 30",
+                num_qubits
+            )));
+        }
+        if sample_size != num_qubits {
+            return Err(MahoutError::InvalidInput(format!(
+                "Angle encoding expects sample_size={} (one angle per qubit), 
got {}",
+                num_qubits, sample_size
+            )));
+        }
+        Ok(AngleEncoderState)
+    }
+
+    fn encode_chunk(
+        &self,
+        _state: &mut Self::State,
+        _engine: &QdpEngine,
+        ctx: &PipelineContext,
+        host_buffer: &PinnedHostBuffer,
+        dev_ptr: u64,
+        samples_in_chunk: usize,
+        sample_size: usize,
+        state_ptr_offset: *mut c_void,
+        state_len: usize,
+        num_qubits: usize,
+        global_sample_offset: usize,
+    ) -> Result<()> {
+        let total_values = 
samples_in_chunk.checked_mul(sample_size).ok_or_else(|| {
+            MahoutError::MemoryAllocation(format!(
+                "Angle chunk size overflow: {} * {}",
+                samples_in_chunk, sample_size
+            ))
+        })?;
+
+        let data_slice = unsafe { 
std::slice::from_raw_parts(host_buffer.ptr(), total_values) };
+        for (i, &val) in data_slice.iter().enumerate() {
+            if !val.is_finite() {
+                let sample_idx = global_sample_offset + (i / sample_size);
+                let angle_idx = i % sample_size;
+                return Err(MahoutError::InvalidInput(format!(
+                    "Sample {} angle {} must be finite, got {}",
+                    sample_idx, angle_idx, val
+                )));
+            }
+        }
+
+        unsafe {
+            crate::profile_scope!("GPU::BatchEncode");
+            let ret = launch_angle_encode_batch(
+                dev_ptr as *const f64,
+                state_ptr_offset,
+                samples_in_chunk,
+                state_len,
+                num_qubits as u32,
+                ctx.stream_compute.stream as *mut c_void,
+            );
+            if ret != 0 {
+                return Err(MahoutError::KernelLaunch(format!(
+                    "Angle encode kernel error: {}",
+                    ret
+                )));
+            }
+        }
+        Ok(())
+    }
+}
diff --git a/qdp/qdp-core/src/encoding/mod.rs b/qdp/qdp-core/src/encoding/mod.rs
index df69941ea..6e770ce94 100644
--- a/qdp/qdp-core/src/encoding/mod.rs
+++ b/qdp/qdp-core/src/encoding/mod.rs
@@ -17,6 +17,7 @@
 //! Streaming encoding implementations for different quantum encoding methods.
 
 mod amplitude;
+mod angle;
 mod basis;
 
 use std::ffi::c_void;
@@ -360,6 +361,10 @@ pub(crate) fn encode_from_parquet(
             crate::profile_scope!("Mahout::EncodeAmplitudeFromParquet");
             stream_encode(engine, path, num_qubits, 
amplitude::AmplitudeEncoder)
         }
+        "angle" => {
+            crate::profile_scope!("Mahout::EncodeAngleFromParquet");
+            stream_encode(engine, path, num_qubits, angle::AngleEncoder)
+        }
         "basis" => {
             crate::profile_scope!("Mahout::EncodeBasisFromParquet");
             stream_encode(engine, path, num_qubits, basis::BasisEncoder)
diff --git a/qdp/qdp-core/src/lib.rs b/qdp/qdp-core/src/lib.rs
index 257916a34..c5bbcf19e 100644
--- a/qdp/qdp-core/src/lib.rs
+++ b/qdp/qdp-core/src/lib.rs
@@ -116,7 +116,7 @@ impl QdpEngine {
     /// * `num_samples` - Number of samples in the batch
     /// * `sample_size` - Size of each sample
     /// * `num_qubits` - Number of qubits
-    /// * `encoding_method` - Strategy (currently only "amplitude" supported 
for batch)
+    /// * `encoding_method` - Strategy: "amplitude", "angle", or "basis"
     ///
     /// # Returns
     /// Single DLPack pointer containing all encoded states (shape: 
[num_samples, 2^num_qubits])
@@ -152,7 +152,7 @@ impl QdpEngine {
     /// # Arguments
     /// * `path` - Path to Parquet file with List<Float64> column
     /// * `num_qubits` - Number of qubits
-    /// * `encoding_method` - Currently only "amplitude" supported for 
streaming
+    /// * `encoding_method` - Strategy: "amplitude", "angle", or "basis"
     ///
     /// # Returns
     /// DLPack pointer to encoded states [num_samples, 2^num_qubits]

Reply via email to