This is an automated email from the ASF dual-hosted git repository.
guanmingchiu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/mahout.git
The following commit(s) were added to refs/heads/main by this push:
new 011e85194 [QDP] Improve Pytorch Copy Lock (#820)
011e85194 is described below
commit 011e85194c9e09750d3a91eba8a2df5f0e7ac812
Author: KUAN-HAO HUANG <[email protected]>
AuthorDate: Tue Jan 13 20:46:27 2026 +0800
[QDP] Improve Pytorch Copy Lock (#820)
* to do zero-copy
* benchmark
---
.../benchmark/benchmark_latency_pytorch.py | 220 +++++++++++++++++++++
qdp/qdp-python/src/lib.rs | 58 ++++--
qdp/qdp-python/tests/test_bindings.py | 28 ++-
3 files changed, 290 insertions(+), 16 deletions(-)
diff --git a/qdp/qdp-python/benchmark/benchmark_latency_pytorch.py
b/qdp/qdp-python/benchmark/benchmark_latency_pytorch.py
new file mode 100755
index 000000000..a0e7b1497
--- /dev/null
+++ b/qdp/qdp-python/benchmark/benchmark_latency_pytorch.py
@@ -0,0 +1,220 @@
+#!/usr/bin/env python3
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+PyTorch Tensor latency benchmark: Tests PyTorch tensor zero-copy optimization.
+
+This is a modified version of benchmark_latency.py that uses PyTorch tensors
+instead of NumPy arrays to test the zero-copy optimization.
+"""
+
+from __future__ import annotations
+
+import argparse
+import queue
+import threading
+import time
+
+import numpy as np
+import torch
+
+from _qdp import QdpEngine
+
+BAR = "=" * 70
+SEP = "-" * 70
+
+
+def sync_cuda() -> None:
+ if torch.cuda.is_available():
+ torch.cuda.synchronize()
+
+
+def build_sample(seed: int, vector_len: int) -> np.ndarray:
+ mask = np.uint64(vector_len - 1)
+ scale = 1.0 / vector_len
+ idx = np.arange(vector_len, dtype=np.uint64)
+ mixed = (idx + np.uint64(seed)) & mask
+ return mixed.astype(np.float64) * scale
+
+
+def prefetched_batches_torch(
+ total_batches: int, batch_size: int, vector_len: int, prefetch: int
+):
+ """Generate batches as PyTorch tensors."""
+ q: queue.Queue[torch.Tensor | None] = queue.Queue(maxsize=prefetch)
+
+ def producer():
+ for batch_idx in range(total_batches):
+ base = batch_idx * batch_size
+ batch = [build_sample(base + i, vector_len) for i in
range(batch_size)]
+ # Convert to PyTorch tensor (CPU, float64, contiguous)
+ batch_tensor = torch.tensor(
+ np.stack(batch), dtype=torch.float64, device="cpu"
+ )
+ assert batch_tensor.is_contiguous(), "Tensor should be contiguous"
+ q.put(batch_tensor)
+ q.put(None)
+
+ threading.Thread(target=producer, daemon=True).start()
+
+ while True:
+ batch = q.get()
+ if batch is None:
+ break
+ yield batch
+
+
+def normalize_batch_torch(batch: torch.Tensor) -> torch.Tensor:
+ """Normalize PyTorch tensor batch."""
+ norms = torch.norm(batch, dim=1, keepdim=True)
+ norms = torch.clamp(norms, min=1e-10) # Avoid division by zero
+ return batch / norms
+
+
+def run_mahout_pytorch(
+ num_qubits: int, total_batches: int, batch_size: int, prefetch: int
+):
+ """Run Mahout benchmark with PyTorch tensor input."""
+ try:
+ engine = QdpEngine(0)
+ except Exception as exc:
+ print(f"[Mahout-PyTorch] Init failed: {exc}")
+ return 0.0, 0.0
+
+ vector_len = 1 << num_qubits
+ sync_cuda()
+ start = time.perf_counter()
+ processed = 0
+
+ for batch_tensor in prefetched_batches_torch(
+ total_batches, batch_size, vector_len, prefetch
+ ):
+ normalized = normalize_batch_torch(batch_tensor)
+ qtensor = engine.encode(normalized, num_qubits, "amplitude")
+ _ = torch.utils.dlpack.from_dlpack(qtensor)
+ processed += normalized.shape[0]
+
+ sync_cuda()
+ duration = time.perf_counter() - start
+ latency_ms = (duration / processed) * 1000 if processed > 0 else 0.0
+ print(f" Total Time: {duration:.4f} s ({latency_ms:.3f} ms/vector)")
+ return duration, latency_ms
+
+
+def run_mahout_numpy(
+ num_qubits: int, total_batches: int, batch_size: int, prefetch: int
+):
+ """Run Mahout benchmark with NumPy array input (for comparison)."""
+ try:
+ engine = QdpEngine(0)
+ except Exception as exc:
+ print(f"[Mahout-NumPy] Init failed: {exc}")
+ return 0.0, 0.0
+
+ vector_len = 1 << num_qubits
+ sync_cuda()
+ start = time.perf_counter()
+ processed = 0
+
+ # Use the same data generation but keep as NumPy
+ for batch_idx in range(total_batches):
+ base = batch_idx * batch_size
+ batch = [build_sample(base + i, vector_len) for i in range(batch_size)]
+ batch_np = np.stack(batch)
+ norms = np.linalg.norm(batch_np, axis=1, keepdims=True)
+ norms[norms == 0] = 1.0
+ normalized = batch_np / norms
+
+ qtensor = engine.encode(normalized, num_qubits, "amplitude")
+ _ = torch.utils.dlpack.from_dlpack(qtensor)
+ processed += normalized.shape[0]
+
+ sync_cuda()
+ duration = time.perf_counter() - start
+ latency_ms = (duration / processed) * 1000 if processed > 0 else 0.0
+ print(f" Total Time: {duration:.4f} s ({latency_ms:.3f} ms/vector)")
+ return duration, latency_ms
+
+
+def main():
+ parser = argparse.ArgumentParser(
+ description="Benchmark PyTorch Tensor encoding latency (zero-copy
optimization test)."
+ )
+ parser.add_argument(
+ "--qubits",
+ type=int,
+ default=16,
+ help="Number of qubits (power-of-two vector length).",
+ )
+ parser.add_argument("--batches", type=int, default=100, help="Total
batches.")
+ parser.add_argument("--batch-size", type=int, default=32, help="Vectors
per batch.")
+ parser.add_argument(
+ "--prefetch", type=int, default=16, help="CPU-side prefetch depth."
+ )
+ args = parser.parse_args()
+
+ if not torch.cuda.is_available():
+ raise SystemExit("CUDA device not available; GPU is required.")
+
+ total_vectors = args.batches * args.batch_size
+ vector_len = 1 << args.qubits
+
+ print(
+ f"PyTorch Tensor Encoding Benchmark: {args.qubits} Qubits,
{total_vectors} Samples"
+ )
+ print(f" Batch size : {args.batch_size}")
+ print(f" Vector length: {vector_len}")
+ print(f" Batches : {args.batches}")
+ print(f" Prefetch : {args.prefetch}")
+ print()
+
+ print(BAR)
+ print(
+ f"PYTORCH TENSOR LATENCY BENCHMARK: {args.qubits} Qubits,
{total_vectors} Samples"
+ )
+ print(BAR)
+
+ print()
+ print("[Mahout-PyTorch] PyTorch Tensor Input (Zero-Copy Optimization)...")
+ t_pytorch, l_pytorch = run_mahout_pytorch(
+ args.qubits, args.batches, args.batch_size, args.prefetch
+ )
+
+ print()
+ print("[Mahout-NumPy] NumPy Array Input (Baseline)...")
+ t_numpy, l_numpy = run_mahout_numpy(
+ args.qubits, args.batches, args.batch_size, args.prefetch
+ )
+
+ print()
+ print(BAR)
+ print("LATENCY COMPARISON (Lower is Better)")
+ print(f"Samples: {total_vectors}, Qubits: {args.qubits}")
+ print(BAR)
+ print(f"{'PyTorch Tensor':18s} {l_pytorch:10.3f} ms/vector")
+ print(f"{'NumPy Array':18s} {l_numpy:10.3f} ms/vector")
+
+ if l_numpy > 0 and l_pytorch > 0:
+ print(SEP)
+ speedup = l_numpy / l_pytorch
+ improvement = ((l_numpy - l_pytorch) / l_numpy) * 100
+ print(f"Speedup: {speedup:.2f}x")
+ print(f"Improvement: {improvement:.1f}%")
+
+
+if __name__ == "__main__":
+ main()
diff --git a/qdp/qdp-python/src/lib.rs b/qdp/qdp-python/src/lib.rs
index 686795c5e..1bd25d798 100644
--- a/qdp/qdp-python/src/lib.rs
+++ b/qdp/qdp-python/src/lib.rs
@@ -14,7 +14,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-use numpy::{PyReadonlyArray1, PyReadonlyArray2, PyUntypedArrayMethods};
+use numpy::{PyReadonlyArray1, PyReadonlyArray2, PyReadonlyArrayDyn,
PyUntypedArrayMethods};
use pyo3::exceptions::PyRuntimeError;
use pyo3::ffi;
use pyo3::prelude::*;
@@ -217,7 +217,7 @@ impl QdpEngine {
/// data: Input data - supports:
/// - Python list: [1.0, 2.0, 3.0, 4.0]
/// - NumPy array: 1D (single sample) or 2D (batch) array
- /// - PyTorch tensor: CPU tensor (will be copied to GPU)
+ /// - PyTorch tensor: CPU float64 tensor (C-contiguous
recommended; converted via NumPy view)
/// - String path: .parquet, .arrow, .npy file
/// - pathlib.Path: Path object (converted via os.fspath())
/// num_qubits: Number of qubits for encoding
@@ -322,18 +322,44 @@ impl QdpEngine {
// Check if it's a PyTorch tensor
if is_pytorch_tensor(data)? {
validate_tensor(data)?;
- // NOTE(perf): `tolist()` + `extract()` makes extra copies (Tensor
-> Python list -> Vec).
- // TODO: Follow-up PR can use `numpy()`/buffer protocol (and
possibly pinned host memory)
- // to reduce copy overhead.
+ // PERF: Avoid Tensor -> Python list -> Vec deep copies.
+ //
+ // For CPU tensors, `tensor.detach().numpy()` returns a NumPy view
that shares the same
+ // underlying memory (zero-copy) when the tensor is C-contiguous.
We can then borrow a
+ // `&[f64]` directly via pyo3-numpy.
let ndim: usize = data.call_method0("dim")?.extract()?;
+ let numpy_view = data
+ .call_method0("detach")?
+ .call_method0("numpy")
+ .map_err(|_| {
+ PyRuntimeError::new_err(
+ "Failed to convert torch.Tensor to NumPy view. Ensure
the tensor is on CPU \
+ and does not require grad (try: tensor =
tensor.detach().cpu())",
+ )
+ })?;
+
+ let array = numpy_view
+ .extract::<PyReadonlyArrayDyn<f64>>()
+ .map_err(|_| {
+ PyRuntimeError::new_err(
+ "Failed to extract NumPy view as float64 array. Ensure
dtype is float64 \
+ (try: tensor = tensor.to(torch.float64))",
+ )
+ })?;
+
+ let data_slice = array.as_slice().map_err(|_| {
+ PyRuntimeError::new_err(
+ "Tensor must be contiguous (C-order) to get zero-copy
slice \
+ (try: tensor = tensor.contiguous())",
+ )
+ })?;
match ndim {
1 => {
// 1D tensor: single sample encoding
- let vec_data: Vec<f64> =
data.call_method0("tolist")?.extract()?;
let ptr = self
.engine
- .encode(&vec_data, num_qubits, encoding_method)
+ .encode(data_slice, num_qubits, encoding_method)
.map_err(|e| PyRuntimeError::new_err(format!("Encoding
failed: {}", e)))?;
return Ok(QuantumTensor {
ptr,
@@ -342,17 +368,19 @@ impl QdpEngine {
}
2 => {
// 2D tensor: batch encoding
- let shape: Vec<i64> = data.getattr("shape")?.extract()?;
- let num_samples = shape[0] as usize;
- let sample_size = shape[1] as usize;
- let vec_data: Vec<f64> = data
- .call_method0("flatten")?
- .call_method0("tolist")?
- .extract()?;
+ let shape = array.shape();
+ if shape.len() != 2 {
+ return Err(PyRuntimeError::new_err(format!(
+ "Unsupported tensor shape: {}D. Expected 2D tensor
(batch_size, features).",
+ shape.len()
+ )));
+ }
+ let num_samples = shape[0];
+ let sample_size = shape[1];
let ptr = self
.engine
.encode_batch(
- &vec_data,
+ data_slice,
num_samples,
sample_size,
num_qubits,
diff --git a/qdp/qdp-python/tests/test_bindings.py
b/qdp/qdp-python/tests/test_bindings.py
index 0efb3576e..b4ba087ab 100644
--- a/qdp/qdp-python/tests/test_bindings.py
+++ b/qdp/qdp-python/tests/test_bindings.py
@@ -154,7 +154,7 @@ def test_pytorch_precision_float64():
@pytest.mark.gpu
def test_encode_tensor_cpu():
- """Test encoding from CPU PyTorch tensor."""
+ """Test encoding from CPU PyTorch tensor (1D, single sample)."""
pytest.importorskip("torch")
import torch
from _qdp import QdpEngine
@@ -172,6 +172,32 @@ def test_encode_tensor_cpu():
assert torch_tensor.shape == (1, 4)
[email protected]
+def test_encode_tensor_batch():
+ """Test encoding from CPU PyTorch tensor (2D, batch encoding with
zero-copy)."""
+ pytest.importorskip("torch")
+ import torch
+ from _qdp import QdpEngine
+
+ if not torch.cuda.is_available():
+ pytest.skip("GPU required for QdpEngine")
+
+ engine = QdpEngine(0)
+ # Create 2D tensor (batch_size=3, features=4)
+ data = torch.tensor(
+ [[1.0, 2.0, 3.0, 4.0], [5.0, 6.0, 7.0, 8.0], [9.0, 10.0, 11.0, 12.0]],
+ dtype=torch.float64,
+ )
+ assert data.is_contiguous(), "Test tensor should be contiguous for
zero-copy"
+
+ qtensor = engine.encode(data, 2, "amplitude")
+
+ # Verify result
+ torch_tensor = torch.from_dlpack(qtensor)
+ assert torch_tensor.is_cuda
+ assert torch_tensor.shape == (3, 4), "Batch encoding should preserve batch
size"
+
+
@pytest.mark.gpu
def test_encode_errors():
"""Test error handling for unified encode method."""