This is an automated email from the ASF dual-hosted git repository.
guanmingchiu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/mahout.git
The following commit(s) were added to refs/heads/main by this push:
new 76a7d1226 MAHOUT-801: Add float32 L2 norm reduction kernel for single
vector (#916)
76a7d1226 is described below
commit 76a7d1226ea1182c5644da276002c187168d1bec
Author: Vic Wen <[email protected]>
AuthorDate: Mon Jan 26 06:50:56 2026 +0800
MAHOUT-801: Add float32 L2 norm reduction kernel for single vector (#916)
* feat: Add float32 support for L2 norm reduction kernels and related
functions
* test: add float32 L2 norm reduction tests for single kernels
* fix: include qdp_no_cuda flag
* test: add f32 amplitude encoding test for odd input length
---
qdp/qdp-kernels/src/amplitude.cu | 125 ++++++++++++++++++++++++++++++
qdp/qdp-kernels/src/lib.rs | 23 ++++++
qdp/qdp-kernels/tests/amplitude_encode.rs | 97 ++++++++++++++++++++++-
3 files changed, 244 insertions(+), 1 deletion(-)
diff --git a/qdp/qdp-kernels/src/amplitude.cu b/qdp/qdp-kernels/src/amplitude.cu
index 9a84a2107..0a31c53b6 100644
--- a/qdp/qdp-kernels/src/amplitude.cu
+++ b/qdp/qdp-kernels/src/amplitude.cu
@@ -103,6 +103,14 @@ __device__ __forceinline__ double warp_reduce_sum(double
val) {
return val;
}
+// Warp-level reduction for sum using shuffle instructions (float32)
+__device__ __forceinline__ float warp_reduce_sum_f32(float val) {
+ for (int offset = warpSize / 2; offset > 0; offset >>= 1) {
+ val += __shfl_down_sync(0xffffffff, val, offset);
+ }
+ return val;
+}
+
// Block-level reduction built on top of warp reduction
__device__ __forceinline__ double block_reduce_sum(double val) {
__shared__ double shared[32]; // supports up to 1024 threads (32 warps)
@@ -123,6 +131,27 @@ __device__ __forceinline__ double block_reduce_sum(double
val) {
return val;
}
+
+// Block-level reduction built on top of warp reduction (float32)
+__device__ __forceinline__ float block_reduce_sum_f32(float val) {
+ __shared__ float shared[32]; // supports up to 1024 threads (32 warps)
+ int lane = threadIdx.x & (warpSize - 1);
+ int warp_id = threadIdx.x >> 5;
+
+ val = warp_reduce_sum_f32(val);
+ if (lane == 0) {
+ shared[warp_id] = val;
+ }
+ __syncthreads();
+
+ // Only first warp participates in final reduction
+ val = (threadIdx.x < (blockDim.x + warpSize - 1) / warpSize) ?
shared[lane] : 0.0f;
+ if (warp_id == 0) {
+ val = warp_reduce_sum_f32(val);
+ }
+ return val;
+}
+
extern "C" {
/// Launch amplitude encoding kernel
@@ -351,6 +380,41 @@ __global__ void l2_norm_kernel(
}
}
+/// Kernel: accumulate L2 norm using coalesced vectorized loads (float32).
+/// Each block atomically adds its partial sum to the output accumulator.
+__global__ void l2_norm_kernel_f32(
+ const float* __restrict__ input,
+ size_t input_len,
+ float* __restrict__ out_accum
+) {
+ // Vectorized float2 loads for bandwidth and coalescing
+ const size_t vec_idx = blockIdx.x * blockDim.x + threadIdx.x;
+ const size_t stride = gridDim.x * blockDim.x;
+
+ float local_sum = 0.0f;
+
+ // Process two elements per iteration via float2
+ size_t vec_offset = vec_idx;
+ size_t offset = vec_offset * 2;
+ while (offset + 1 < input_len) {
+ const float2 v = __ldg(reinterpret_cast<const float2*>(input) +
vec_offset);
+ local_sum += v.x * v.x + v.y * v.y;
+ vec_offset += stride;
+ offset = vec_offset * 2;
+ }
+
+ // Handle tail element if input_len is odd
+ if (offset < input_len) {
+ const float v = __ldg(input + offset);
+ local_sum += v * v;
+ }
+
+ const float block_sum = block_reduce_sum_f32(local_sum);
+ if (threadIdx.x == 0) {
+ atomicAdd(out_accum, block_sum);
+ }
+}
+
/// Kernel: accumulate L2 norms for a batch.
/// Grid is organized as (blocks_per_sample * num_samples) blocks.
__global__ void l2_norm_batch_kernel(
@@ -408,6 +472,23 @@ __global__ void finalize_inv_norm_kernel(
}
}
+/// Kernel: converts accumulated sum-of-squares into inverse norms (float32).
+__global__ void finalize_inv_norm_kernel_f32(
+ float* __restrict__ norms,
+ size_t count
+) {
+ const size_t idx = blockIdx.x * blockDim.x + threadIdx.x;
+ if (idx >= count) return;
+
+ float sum = norms[idx];
+ // Guard against zero or NaN to avoid inf propagation
+ if (sum <= 0.0f || !isfinite(sum)) {
+ norms[idx] = 0.0f;
+ } else {
+ norms[idx] = rsqrtf(sum);
+ }
+}
+
/// Launch L2 norm reduction for a single vector.
/// Writes the inverse norm (1 / ||x||) into `inv_norm_out_d`.
int launch_l2_norm(
@@ -452,6 +533,50 @@ int launch_l2_norm(
return (int)cudaGetLastError();
}
+/// Launch L2 norm reduction for a single vector (float32).
+/// Writes the inverse norm (1 / ||x||) into `inv_norm_out_d`.
+int launch_l2_norm_f32(
+ const float* input_d,
+ size_t input_len,
+ float* inv_norm_out_d,
+ cudaStream_t stream
+) {
+ if (input_len == 0) {
+ return cudaErrorInvalidValue;
+ }
+
+ cudaError_t memset_status = cudaMemsetAsync(
+ inv_norm_out_d,
+ 0,
+ sizeof(float),
+ stream
+ );
+ if (memset_status != cudaSuccess) {
+ return memset_status;
+ }
+
+ const int blockSize = DEFAULT_BLOCK_SIZE;
+ const size_t elements_per_block = blockSize * 2; // float2 per thread
+ size_t gridSize = (input_len + elements_per_block - 1) /
elements_per_block;
+ gridSize = (gridSize == 0) ? 1 : gridSize;
+ const size_t maxBlocks = MAX_GRID_BLOCKS_L2_NORM;
+ if (gridSize > maxBlocks) gridSize = maxBlocks;
+
+ l2_norm_kernel_f32<<<gridSize, blockSize, 0, stream>>>(
+ input_d,
+ input_len,
+ inv_norm_out_d
+ );
+
+ // Finalize: convert accumulated sum to inverse norm
+ finalize_inv_norm_kernel_f32<<<1, 32, 0, stream>>>(
+ inv_norm_out_d,
+ 1
+ );
+
+ return (int)cudaGetLastError();
+}
+
/// Launch L2 norm reduction for a batch of vectors.
/// Writes inverse norms for each sample into `inv_norms_out_d`.
int launch_l2_norm_batch(
diff --git a/qdp/qdp-kernels/src/lib.rs b/qdp/qdp-kernels/src/lib.rs
index e31cb4e1d..de00a2e6b 100644
--- a/qdp/qdp-kernels/src/lib.rs
+++ b/qdp/qdp-kernels/src/lib.rs
@@ -110,6 +110,18 @@ unsafe extern "C" {
stream: *mut c_void,
) -> i32;
+ /// Launch L2 norm reduction (returns inverse norm) for float32
+ /// Returns CUDA error code (0 = success)
+ ///
+ /// # Safety
+ /// Pointers must reference valid device memory on the provided stream.
+ pub fn launch_l2_norm_f32(
+ input_d: *const f32,
+ input_len: usize,
+ inv_norm_out_d: *mut f32,
+ stream: *mut c_void,
+ ) -> i32;
+
/// Launch batched L2 norm reduction (returns inverse norms per sample)
/// Returns CUDA error code (0 = success)
///
@@ -284,6 +296,17 @@ pub extern "C" fn launch_l2_norm_batch(
999
}
+#[cfg(any(not(target_os = "linux"), qdp_no_cuda))]
+#[unsafe(no_mangle)]
+pub extern "C" fn launch_l2_norm_f32(
+ _input_d: *const f32,
+ _input_len: usize,
+ _inv_norm_out_d: *mut f32,
+ _stream: *mut c_void,
+) -> i32 {
+ 999
+}
+
#[cfg(any(not(target_os = "linux"), qdp_no_cuda))]
#[unsafe(no_mangle)]
pub extern "C" fn convert_state_to_float(
diff --git a/qdp/qdp-kernels/tests/amplitude_encode.rs
b/qdp/qdp-kernels/tests/amplitude_encode.rs
index 7ee181bfd..5ff9ec1b4 100644
--- a/qdp/qdp-kernels/tests/amplitude_encode.rs
+++ b/qdp/qdp-kernels/tests/amplitude_encode.rs
@@ -26,7 +26,7 @@ use cudarc::driver::{CudaDevice, DevicePtr, DevicePtrMut};
#[cfg(target_os = "linux")]
use qdp_kernels::{
CuComplex, CuDoubleComplex, launch_amplitude_encode,
launch_amplitude_encode_f32,
- launch_l2_norm, launch_l2_norm_batch,
+ launch_l2_norm, launch_l2_norm_batch, launch_l2_norm_f32,
};
const EPSILON: f64 = 1e-10;
@@ -282,6 +282,55 @@ fn test_amplitude_encode_odd_input_length() {
println!("PASS: Odd input length handled correctly");
}
+#[test]
+#[cfg(target_os = "linux")]
+fn test_amplitude_encode_odd_input_length_f32() {
+ println!("Testing amplitude encoding with odd input length (float32)...");
+
+ let device = match CudaDevice::new(0) {
+ Ok(d) => d,
+ Err(_) => {
+ println!("SKIP: No CUDA device available");
+ return;
+ }
+ };
+
+ // Test with 3 input values, state size 4
+ let input: Vec<f32> = vec![1.0, 2.0, 2.0];
+ let norm = (1.0_f32 + 4.0 + 4.0).sqrt(); // 3.0
+ let inv_norm = 1.0f32 / norm;
+ let state_len = 4;
+
+ let input_d = device.htod_copy(input.clone()).unwrap();
+ let mut state_d = device.alloc_zeros::<CuComplex>(state_len).unwrap();
+
+ let result = unsafe {
+ launch_amplitude_encode_f32(
+ *input_d.device_ptr() as *const f32,
+ *state_d.device_ptr_mut() as *mut std::ffi::c_void,
+ input.len(),
+ state_len,
+ inv_norm,
+ std::ptr::null_mut(),
+ )
+ };
+
+ assert_eq!(result, 0, "Kernel launch should succeed");
+
+ let state_h = device.dtoh_sync_copy(&state_d).unwrap();
+
+ // Verify: [1/3, 2/3, 2/3, 0]
+ assert!((state_h[0].x - 1.0 / 3.0).abs() < EPSILON_F32);
+ assert!((state_h[1].x - 2.0 / 3.0).abs() < EPSILON_F32);
+ assert!((state_h[2].x - 2.0 / 3.0).abs() < EPSILON_F32);
+ assert!(
+ state_h[3].x.abs() < EPSILON_F32,
+ "Fourth element should be padded with 0"
+ );
+
+ println!("PASS: Odd input length (float32) handled correctly");
+}
+
#[test]
#[cfg(target_os = "linux")]
fn test_amplitude_encode_large_state() {
@@ -622,6 +671,52 @@ fn test_l2_norm_batch_kernel_stream() {
println!("PASS: Batched norm reduction on stream matches CPU");
}
+#[test]
+#[cfg(target_os = "linux")]
+fn test_l2_norm_single_kernel_f32() {
+ println!("Testing L2 norm reduction kernel (float32)...");
+
+ let device = match CudaDevice::new(0) {
+ Ok(d) => d,
+ Err(_) => {
+ println!("SKIP: No CUDA device available");
+ return;
+ }
+ };
+
+ // Test input: [3.0, 4.0] -> norm = 5.0, inv_norm = 0.2
+ let input: Vec<f32> = vec![3.0, 4.0];
+ let expected_norm = (3.0_f32.powi(2) + 4.0_f32.powi(2)).sqrt(); // 5.0
+ let expected_inv_norm = 1.0f32 / expected_norm; // 0.2
+
+ let input_d = device.htod_sync_copy(input.as_slice()).unwrap();
+ let mut inv_norm_d = device.alloc_zeros::<f32>(1).unwrap();
+
+ let status = unsafe {
+ launch_l2_norm_f32(
+ *input_d.device_ptr() as *const f32,
+ input.len(),
+ *inv_norm_d.device_ptr_mut() as *mut f32,
+ std::ptr::null_mut(),
+ )
+ };
+
+ assert_eq!(status, 0, "Norm kernel should succeed");
+ device.synchronize().unwrap();
+
+ let inv_norm_h = device.dtoh_sync_copy(&inv_norm_d).unwrap();
+ let got = inv_norm_h[0];
+
+ assert!(
+ (got - expected_inv_norm).abs() < EPSILON_F32,
+ "Inv norm mismatch: expected {}, got {}",
+ expected_inv_norm,
+ got
+ );
+
+ println!("PASS: Single norm reduction (float32) matches CPU");
+}
+
#[test]
#[cfg(not(target_os = "linux"))]
fn test_amplitude_encode_dummy_non_linux() {