This is an automated email from the ASF dual-hosted git repository.

guanmingchiu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/mahout.git


The following commit(s) were added to refs/heads/main by this push:
     new 011e85194 [QDP] Improve Pytorch Copy Lock (#820)
011e85194 is described below

commit 011e85194c9e09750d3a91eba8a2df5f0e7ac812
Author: KUAN-HAO HUANG <[email protected]>
AuthorDate: Tue Jan 13 20:46:27 2026 +0800

    [QDP] Improve Pytorch Copy Lock (#820)
    
    * to do zero-copy
    
    * benchmark
---
 .../benchmark/benchmark_latency_pytorch.py         | 220 +++++++++++++++++++++
 qdp/qdp-python/src/lib.rs                          |  58 ++++--
 qdp/qdp-python/tests/test_bindings.py              |  28 ++-
 3 files changed, 290 insertions(+), 16 deletions(-)

diff --git a/qdp/qdp-python/benchmark/benchmark_latency_pytorch.py 
b/qdp/qdp-python/benchmark/benchmark_latency_pytorch.py
new file mode 100755
index 000000000..a0e7b1497
--- /dev/null
+++ b/qdp/qdp-python/benchmark/benchmark_latency_pytorch.py
@@ -0,0 +1,220 @@
+#!/usr/bin/env python3
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+PyTorch Tensor latency benchmark: Tests PyTorch tensor zero-copy optimization.
+
+This is a modified version of benchmark_latency.py that uses PyTorch tensors
+instead of NumPy arrays to test the zero-copy optimization.
+"""
+
+from __future__ import annotations
+
+import argparse
+import queue
+import threading
+import time
+
+import numpy as np
+import torch
+
+from _qdp import QdpEngine
+
+BAR = "=" * 70
+SEP = "-" * 70
+
+
+def sync_cuda() -> None:
+    if torch.cuda.is_available():
+        torch.cuda.synchronize()
+
+
+def build_sample(seed: int, vector_len: int) -> np.ndarray:
+    mask = np.uint64(vector_len - 1)
+    scale = 1.0 / vector_len
+    idx = np.arange(vector_len, dtype=np.uint64)
+    mixed = (idx + np.uint64(seed)) & mask
+    return mixed.astype(np.float64) * scale
+
+
+def prefetched_batches_torch(
+    total_batches: int, batch_size: int, vector_len: int, prefetch: int
+):
+    """Generate batches as PyTorch tensors."""
+    q: queue.Queue[torch.Tensor | None] = queue.Queue(maxsize=prefetch)
+
+    def producer():
+        for batch_idx in range(total_batches):
+            base = batch_idx * batch_size
+            batch = [build_sample(base + i, vector_len) for i in 
range(batch_size)]
+            # Convert to PyTorch tensor (CPU, float64, contiguous)
+            batch_tensor = torch.tensor(
+                np.stack(batch), dtype=torch.float64, device="cpu"
+            )
+            assert batch_tensor.is_contiguous(), "Tensor should be contiguous"
+            q.put(batch_tensor)
+        q.put(None)
+
+    threading.Thread(target=producer, daemon=True).start()
+
+    while True:
+        batch = q.get()
+        if batch is None:
+            break
+        yield batch
+
+
+def normalize_batch_torch(batch: torch.Tensor) -> torch.Tensor:
+    """Normalize PyTorch tensor batch."""
+    norms = torch.norm(batch, dim=1, keepdim=True)
+    norms = torch.clamp(norms, min=1e-10)  # Avoid division by zero
+    return batch / norms
+
+
+def run_mahout_pytorch(
+    num_qubits: int, total_batches: int, batch_size: int, prefetch: int
+):
+    """Run Mahout benchmark with PyTorch tensor input."""
+    try:
+        engine = QdpEngine(0)
+    except Exception as exc:
+        print(f"[Mahout-PyTorch] Init failed: {exc}")
+        return 0.0, 0.0
+
+    vector_len = 1 << num_qubits
+    sync_cuda()
+    start = time.perf_counter()
+    processed = 0
+
+    for batch_tensor in prefetched_batches_torch(
+        total_batches, batch_size, vector_len, prefetch
+    ):
+        normalized = normalize_batch_torch(batch_tensor)
+        qtensor = engine.encode(normalized, num_qubits, "amplitude")
+        _ = torch.utils.dlpack.from_dlpack(qtensor)
+        processed += normalized.shape[0]
+
+    sync_cuda()
+    duration = time.perf_counter() - start
+    latency_ms = (duration / processed) * 1000 if processed > 0 else 0.0
+    print(f"  Total Time: {duration:.4f} s ({latency_ms:.3f} ms/vector)")
+    return duration, latency_ms
+
+
+def run_mahout_numpy(
+    num_qubits: int, total_batches: int, batch_size: int, prefetch: int
+):
+    """Run Mahout benchmark with NumPy array input (for comparison)."""
+    try:
+        engine = QdpEngine(0)
+    except Exception as exc:
+        print(f"[Mahout-NumPy] Init failed: {exc}")
+        return 0.0, 0.0
+
+    vector_len = 1 << num_qubits
+    sync_cuda()
+    start = time.perf_counter()
+    processed = 0
+
+    # Use the same data generation but keep as NumPy
+    for batch_idx in range(total_batches):
+        base = batch_idx * batch_size
+        batch = [build_sample(base + i, vector_len) for i in range(batch_size)]
+        batch_np = np.stack(batch)
+        norms = np.linalg.norm(batch_np, axis=1, keepdims=True)
+        norms[norms == 0] = 1.0
+        normalized = batch_np / norms
+
+        qtensor = engine.encode(normalized, num_qubits, "amplitude")
+        _ = torch.utils.dlpack.from_dlpack(qtensor)
+        processed += normalized.shape[0]
+
+    sync_cuda()
+    duration = time.perf_counter() - start
+    latency_ms = (duration / processed) * 1000 if processed > 0 else 0.0
+    print(f"  Total Time: {duration:.4f} s ({latency_ms:.3f} ms/vector)")
+    return duration, latency_ms
+
+
+def main():
+    parser = argparse.ArgumentParser(
+        description="Benchmark PyTorch Tensor encoding latency (zero-copy 
optimization test)."
+    )
+    parser.add_argument(
+        "--qubits",
+        type=int,
+        default=16,
+        help="Number of qubits (power-of-two vector length).",
+    )
+    parser.add_argument("--batches", type=int, default=100, help="Total 
batches.")
+    parser.add_argument("--batch-size", type=int, default=32, help="Vectors 
per batch.")
+    parser.add_argument(
+        "--prefetch", type=int, default=16, help="CPU-side prefetch depth."
+    )
+    args = parser.parse_args()
+
+    if not torch.cuda.is_available():
+        raise SystemExit("CUDA device not available; GPU is required.")
+
+    total_vectors = args.batches * args.batch_size
+    vector_len = 1 << args.qubits
+
+    print(
+        f"PyTorch Tensor Encoding Benchmark: {args.qubits} Qubits, 
{total_vectors} Samples"
+    )
+    print(f"  Batch size   : {args.batch_size}")
+    print(f"  Vector length: {vector_len}")
+    print(f"  Batches      : {args.batches}")
+    print(f"  Prefetch     : {args.prefetch}")
+    print()
+
+    print(BAR)
+    print(
+        f"PYTORCH TENSOR LATENCY BENCHMARK: {args.qubits} Qubits, 
{total_vectors} Samples"
+    )
+    print(BAR)
+
+    print()
+    print("[Mahout-PyTorch] PyTorch Tensor Input (Zero-Copy Optimization)...")
+    t_pytorch, l_pytorch = run_mahout_pytorch(
+        args.qubits, args.batches, args.batch_size, args.prefetch
+    )
+
+    print()
+    print("[Mahout-NumPy] NumPy Array Input (Baseline)...")
+    t_numpy, l_numpy = run_mahout_numpy(
+        args.qubits, args.batches, args.batch_size, args.prefetch
+    )
+
+    print()
+    print(BAR)
+    print("LATENCY COMPARISON (Lower is Better)")
+    print(f"Samples: {total_vectors}, Qubits: {args.qubits}")
+    print(BAR)
+    print(f"{'PyTorch Tensor':18s} {l_pytorch:10.3f} ms/vector")
+    print(f"{'NumPy Array':18s} {l_numpy:10.3f} ms/vector")
+
+    if l_numpy > 0 and l_pytorch > 0:
+        print(SEP)
+        speedup = l_numpy / l_pytorch
+        improvement = ((l_numpy - l_pytorch) / l_numpy) * 100
+        print(f"Speedup: {speedup:.2f}x")
+        print(f"Improvement: {improvement:.1f}%")
+
+
+if __name__ == "__main__":
+    main()
diff --git a/qdp/qdp-python/src/lib.rs b/qdp/qdp-python/src/lib.rs
index 686795c5e..1bd25d798 100644
--- a/qdp/qdp-python/src/lib.rs
+++ b/qdp/qdp-python/src/lib.rs
@@ -14,7 +14,7 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-use numpy::{PyReadonlyArray1, PyReadonlyArray2, PyUntypedArrayMethods};
+use numpy::{PyReadonlyArray1, PyReadonlyArray2, PyReadonlyArrayDyn, 
PyUntypedArrayMethods};
 use pyo3::exceptions::PyRuntimeError;
 use pyo3::ffi;
 use pyo3::prelude::*;
@@ -217,7 +217,7 @@ impl QdpEngine {
     ///     data: Input data - supports:
     ///         - Python list: [1.0, 2.0, 3.0, 4.0]
     ///         - NumPy array: 1D (single sample) or 2D (batch) array
-    ///         - PyTorch tensor: CPU tensor (will be copied to GPU)
+    ///         - PyTorch tensor: CPU float64 tensor (C-contiguous 
recommended; converted via NumPy view)
     ///         - String path: .parquet, .arrow, .npy file
     ///         - pathlib.Path: Path object (converted via os.fspath())
     ///     num_qubits: Number of qubits for encoding
@@ -322,18 +322,44 @@ impl QdpEngine {
         // Check if it's a PyTorch tensor
         if is_pytorch_tensor(data)? {
             validate_tensor(data)?;
-            // NOTE(perf): `tolist()` + `extract()` makes extra copies (Tensor 
-> Python list -> Vec).
-            // TODO: Follow-up PR can use `numpy()`/buffer protocol (and 
possibly pinned host memory)
-            // to reduce copy overhead.
+            // PERF: Avoid Tensor -> Python list -> Vec deep copies.
+            //
+            // For CPU tensors, `tensor.detach().numpy()` returns a NumPy view 
that shares the same
+            // underlying memory (zero-copy) when the tensor is C-contiguous. 
We can then borrow a
+            // `&[f64]` directly via pyo3-numpy.
             let ndim: usize = data.call_method0("dim")?.extract()?;
+            let numpy_view = data
+                .call_method0("detach")?
+                .call_method0("numpy")
+                .map_err(|_| {
+                    PyRuntimeError::new_err(
+                        "Failed to convert torch.Tensor to NumPy view. Ensure 
the tensor is on CPU \
+                         and does not require grad (try: tensor = 
tensor.detach().cpu())",
+                    )
+                })?;
+
+            let array = numpy_view
+                .extract::<PyReadonlyArrayDyn<f64>>()
+                .map_err(|_| {
+                    PyRuntimeError::new_err(
+                        "Failed to extract NumPy view as float64 array. Ensure 
dtype is float64 \
+                         (try: tensor = tensor.to(torch.float64))",
+                    )
+                })?;
+
+            let data_slice = array.as_slice().map_err(|_| {
+                PyRuntimeError::new_err(
+                    "Tensor must be contiguous (C-order) to get zero-copy 
slice \
+                     (try: tensor = tensor.contiguous())",
+                )
+            })?;
 
             match ndim {
                 1 => {
                     // 1D tensor: single sample encoding
-                    let vec_data: Vec<f64> = 
data.call_method0("tolist")?.extract()?;
                     let ptr = self
                         .engine
-                        .encode(&vec_data, num_qubits, encoding_method)
+                        .encode(data_slice, num_qubits, encoding_method)
                         .map_err(|e| PyRuntimeError::new_err(format!("Encoding 
failed: {}", e)))?;
                     return Ok(QuantumTensor {
                         ptr,
@@ -342,17 +368,19 @@ impl QdpEngine {
                 }
                 2 => {
                     // 2D tensor: batch encoding
-                    let shape: Vec<i64> = data.getattr("shape")?.extract()?;
-                    let num_samples = shape[0] as usize;
-                    let sample_size = shape[1] as usize;
-                    let vec_data: Vec<f64> = data
-                        .call_method0("flatten")?
-                        .call_method0("tolist")?
-                        .extract()?;
+                    let shape = array.shape();
+                    if shape.len() != 2 {
+                        return Err(PyRuntimeError::new_err(format!(
+                            "Unsupported tensor shape: {}D. Expected 2D tensor 
(batch_size, features).",
+                            shape.len()
+                        )));
+                    }
+                    let num_samples = shape[0];
+                    let sample_size = shape[1];
                     let ptr = self
                         .engine
                         .encode_batch(
-                            &vec_data,
+                            data_slice,
                             num_samples,
                             sample_size,
                             num_qubits,
diff --git a/qdp/qdp-python/tests/test_bindings.py 
b/qdp/qdp-python/tests/test_bindings.py
index 0efb3576e..b4ba087ab 100644
--- a/qdp/qdp-python/tests/test_bindings.py
+++ b/qdp/qdp-python/tests/test_bindings.py
@@ -154,7 +154,7 @@ def test_pytorch_precision_float64():
 
 @pytest.mark.gpu
 def test_encode_tensor_cpu():
-    """Test encoding from CPU PyTorch tensor."""
+    """Test encoding from CPU PyTorch tensor (1D, single sample)."""
     pytest.importorskip("torch")
     import torch
     from _qdp import QdpEngine
@@ -172,6 +172,32 @@ def test_encode_tensor_cpu():
     assert torch_tensor.shape == (1, 4)
 
 
[email protected]
+def test_encode_tensor_batch():
+    """Test encoding from CPU PyTorch tensor (2D, batch encoding with 
zero-copy)."""
+    pytest.importorskip("torch")
+    import torch
+    from _qdp import QdpEngine
+
+    if not torch.cuda.is_available():
+        pytest.skip("GPU required for QdpEngine")
+
+    engine = QdpEngine(0)
+    # Create 2D tensor (batch_size=3, features=4)
+    data = torch.tensor(
+        [[1.0, 2.0, 3.0, 4.0], [5.0, 6.0, 7.0, 8.0], [9.0, 10.0, 11.0, 12.0]],
+        dtype=torch.float64,
+    )
+    assert data.is_contiguous(), "Test tensor should be contiguous for 
zero-copy"
+
+    qtensor = engine.encode(data, 2, "amplitude")
+
+    # Verify result
+    torch_tensor = torch.from_dlpack(qtensor)
+    assert torch_tensor.is_cuda
+    assert torch_tensor.shape == (3, 4), "Batch encoding should preserve batch 
size"
+
+
 @pytest.mark.gpu
 def test_encode_errors():
     """Test error handling for unified encode method."""

Reply via email to