This is an automated email from the ASF dual-hosted git repository.
guanmingchiu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/mahout.git
The following commit(s) were added to refs/heads/main by this push:
new ccfc116ff [QDP] Add encoding method parameter to all benchmarks (#840)
ccfc116ff is described below
commit ccfc116ff6fa887e379d7ba2e94a9aa915ad34f7
Author: Guan-Ming (Wesley) Chiu <[email protected]>
AuthorDate: Sat Jan 17 22:17:27 2026 +0800
[QDP] Add encoding method parameter to all benchmarks (#840)
* Add encoding method parameter to all benchmarks
* Update benchmark readme
---
qdp/qdp-python/benchmark/README.md | 6 +
qdp/qdp-python/benchmark/benchmark_e2e.py | 49 +++--
qdp/qdp-python/benchmark/benchmark_latency.py | 69 +++----
.../benchmark/benchmark_latency_pytorch.py | 92 ++++-----
qdp/qdp-python/benchmark/benchmark_numpy_io.py | 37 +++-
qdp/qdp-python/benchmark/benchmark_throughput.py | 68 +++----
qdp/qdp-python/benchmark/utils.py | 222 +++++++++++++++++++++
7 files changed, 379 insertions(+), 164 deletions(-)
diff --git a/qdp/qdp-python/benchmark/README.md
b/qdp/qdp-python/benchmark/README.md
index d0ea49b29..e5d50b3e2 100644
--- a/qdp/qdp-python/benchmark/README.md
+++ b/qdp/qdp-python/benchmark/README.md
@@ -43,12 +43,14 @@ Additional options:
```bash
python benchmark_e2e.py --qubits 16 --samples 200 --frameworks mahout-parquet
mahout-arrow
python benchmark_e2e.py --frameworks all
+python benchmark_e2e.py --encoding-method basis
```
Notes:
- `--frameworks` accepts a space-separated list or `all`.
Options: `mahout-parquet`, `mahout-arrow`, `pennylane`, `qiskit`.
+- `--encoding-method` selects the encoding method: `amplitude` (default) or
`basis`.
- The script writes `final_benchmark_data.parquet` and
`final_benchmark_data.arrow` in the current working directory and overwrites
them on each run.
@@ -61,12 +63,14 @@ Notes:
cd qdp/qdp-python/benchmark
python benchmark_latency.py --qubits 16 --batches 200 --batch-size 64
--prefetch 16
python benchmark_latency.py --frameworks mahout,pennylane
+python benchmark_latency.py --encoding-method basis
```
Notes:
- `--frameworks` is a comma-separated list or `all`.
Options: `mahout`, `pennylane`, `qiskit-init`, `qiskit-statevector`.
+- `--encoding-method` selects the encoding method: `amplitude` (default) or
`basis`.
- The latency test reports average milliseconds per vector.
- Flags:
- `--qubits`: controls vector length (`2^qubits`).
@@ -88,12 +92,14 @@ output.
cd qdp/qdp-python/benchmark
python benchmark_throughput.py --qubits 16 --batches 200 --batch-size 64
--prefetch 16
python benchmark_throughput.py --frameworks mahout,pennylane
+python benchmark_throughput.py --encoding-method basis
```
Notes:
- `--frameworks` is a comma-separated list or `all`.
Options: `mahout`, `pennylane`, `qiskit`.
+- `--encoding-method` selects the encoding method: `amplitude` (default) or
`basis`.
- Throughput is reported in vectors/sec (higher is better).
## Dependency Notes
diff --git a/qdp/qdp-python/benchmark/benchmark_e2e.py
b/qdp/qdp-python/benchmark/benchmark_e2e.py
index b0e19be8f..4e22dfb58 100644
--- a/qdp/qdp-python/benchmark/benchmark_e2e.py
+++ b/qdp/qdp-python/benchmark/benchmark_e2e.py
@@ -40,6 +40,7 @@ import pyarrow as pa
import pyarrow.parquet as pq
import pyarrow.ipc as ipc
from _qdp import QdpEngine
+from utils import generate_batch_data
# Competitors
try:
@@ -81,7 +82,7 @@ class DummyQNN(nn.Module):
return self.fc(x)
-def generate_data(n_qubits, n_samples):
+def generate_data(n_qubits, n_samples, encoding_method: str = "amplitude"):
for f in [DATA_FILE, ARROW_FILE]:
if os.path.exists(f):
os.remove(f)
@@ -90,8 +91,7 @@ def generate_data(n_qubits, n_samples):
dim = 1 << n_qubits
# Generate all data at once
- np.random.seed(42)
- all_data = np.random.rand(n_samples, dim).astype(np.float64)
+ all_data = generate_batch_data(n_samples, dim, encoding_method, seed=42)
# Save as Parquet (List format for PennyLane/Qiskit)
feature_vectors = [row.tolist() for row in all_data]
@@ -101,8 +101,14 @@ def generate_data(n_qubits, n_samples):
pq.write_table(table, DATA_FILE)
# Save as Arrow IPC (FixedSizeList format for Mahout)
- arr = pa.FixedSizeListArray.from_arrays(pa.array(all_data.flatten()), dim)
- arrow_table = pa.table({"data": arr})
+ if encoding_method == "basis":
+ # For basis encoding, create a simple array of indices
+ arr = pa.array(all_data.flatten(), type=pa.float64())
+ arrow_table = pa.table({"data": arr})
+ else:
+ # For amplitude encoding, use FixedSizeList format
+ arr = pa.FixedSizeListArray.from_arrays(pa.array(all_data.flatten()),
dim)
+ arrow_table = pa.table({"data": arr})
with ipc.RecordBatchFileWriter(ARROW_FILE, arrow_table.schema) as writer:
writer.write_table(arrow_table)
@@ -255,7 +261,7 @@ def run_pennylane(n_qubits, n_samples):
# -----------------------------------------------------------
# 3. Mahout Parquet Pipeline
# -----------------------------------------------------------
-def run_mahout_parquet(engine, n_qubits, n_samples):
+def run_mahout_parquet(engine, n_qubits, n_samples, encoding_method: str =
"amplitude"):
# Clean cache before starting benchmark
clean_cache()
@@ -267,7 +273,13 @@ def run_mahout_parquet(engine, n_qubits, n_samples):
# Direct Parquet to GPU pipeline
parquet_encode_start = time.perf_counter()
- qtensor = engine.encode(DATA_FILE, n_qubits)
+ try:
+ qtensor = engine.encode(DATA_FILE, n_qubits, encoding_method)
+ except RuntimeError as e:
+ if "Only amplitude encoding supported" in str(e):
+ print("Basis encoding not supported for streaming from Parquet,
skipping.")
+ return 0.0, None
+ raise
parquet_encode_time = time.perf_counter() - parquet_encode_start
print(f" Parquet->GPU (IO+Encode): {parquet_encode_time:.4f} s")
@@ -307,7 +319,7 @@ def run_mahout_parquet(engine, n_qubits, n_samples):
# -----------------------------------------------------------
# 4. Mahout Arrow IPC Pipeline
# -----------------------------------------------------------
-def run_mahout_arrow(engine, n_qubits, n_samples):
+def run_mahout_arrow(engine, n_qubits, n_samples, encoding_method: str =
"amplitude"):
# Clean cache before starting benchmark
clean_cache()
@@ -318,7 +330,13 @@ def run_mahout_arrow(engine, n_qubits, n_samples):
start_time = time.perf_counter()
arrow_encode_start = time.perf_counter()
- qtensor = engine.encode(ARROW_FILE, n_qubits)
+ try:
+ qtensor = engine.encode(ARROW_FILE, n_qubits, encoding_method)
+ except RuntimeError as e:
+ if "Only amplitude encoding supported" in str(e):
+ print(" Basis encoding not supported for streaming from Arrow,
skipping.")
+ return 0.0, None
+ raise
arrow_encode_time = time.perf_counter() - arrow_encode_start
print(f" Arrow->GPU (IO+Encode): {arrow_encode_time:.4f} s")
@@ -408,13 +426,20 @@ if __name__ == "__main__":
choices=["mahout-parquet", "mahout-arrow", "pennylane", "qiskit",
"all"],
help="Frameworks to benchmark. Use 'all' to run all available
frameworks.",
)
+ parser.add_argument(
+ "--encoding-method",
+ type=str,
+ default="amplitude",
+ choices=["amplitude", "basis"],
+ help="Encoding method to use for Mahout (amplitude or basis).",
+ )
args = parser.parse_args()
# Expand "all" option
if "all" in args.frameworks:
args.frameworks = ["mahout-parquet", "mahout-arrow", "pennylane",
"qiskit"]
- generate_data(args.qubits, args.samples)
+ generate_data(args.qubits, args.samples, args.encoding_method)
try:
engine = QdpEngine(0)
@@ -448,14 +473,14 @@ if __name__ == "__main__":
if "mahout-parquet" in args.frameworks:
t_mahout_parquet, mahout_parquet_all_states = run_mahout_parquet(
- engine, args.qubits, args.samples
+ engine, args.qubits, args.samples, args.encoding_method
)
# Clean cache between framework benchmarks
clean_cache()
if "mahout-arrow" in args.frameworks:
t_mahout_arrow, mahout_arrow_all_states = run_mahout_arrow(
- engine, args.qubits, args.samples
+ engine, args.qubits, args.samples, args.encoding_method
)
# Clean cache between framework benchmarks
clean_cache()
diff --git a/qdp/qdp-python/benchmark/benchmark_latency.py
b/qdp/qdp-python/benchmark/benchmark_latency.py
index 0b89d8fd9..252647d4a 100644
--- a/qdp/qdp-python/benchmark/benchmark_latency.py
+++ b/qdp/qdp-python/benchmark/benchmark_latency.py
@@ -26,14 +26,12 @@ Run:
from __future__ import annotations
import argparse
-import queue
-import threading
import time
-import numpy as np
import torch
from _qdp import QdpEngine
+from utils import normalize_batch, prefetched_batches
BAR = "=" * 70
SEP = "-" * 70
@@ -67,41 +65,6 @@ def sync_cuda() -> None:
torch.cuda.synchronize()
-def build_sample(seed: int, vector_len: int) -> np.ndarray:
- mask = np.uint64(vector_len - 1)
- scale = 1.0 / vector_len
- idx = np.arange(vector_len, dtype=np.uint64)
- mixed = (idx + np.uint64(seed)) & mask
- return mixed.astype(np.float64) * scale
-
-
-def prefetched_batches(
- total_batches: int, batch_size: int, vector_len: int, prefetch: int
-):
- q: queue.Queue[np.ndarray | None] = queue.Queue(maxsize=prefetch)
-
- def producer():
- for batch_idx in range(total_batches):
- base = batch_idx * batch_size
- batch = [build_sample(base + i, vector_len) for i in
range(batch_size)]
- q.put(np.stack(batch))
- q.put(None)
-
- threading.Thread(target=producer, daemon=True).start()
-
- while True:
- batch = q.get()
- if batch is None:
- break
- yield batch
-
-
-def normalize_batch(batch: np.ndarray) -> np.ndarray:
- norms = np.linalg.norm(batch, axis=1, keepdims=True)
- norms[norms == 0] = 1.0
- return batch / norms
-
-
def parse_frameworks(raw: str) -> list[str]:
if raw.lower() == "all":
return list(FRAMEWORK_CHOICES)
@@ -122,7 +85,13 @@ def parse_frameworks(raw: str) -> list[str]:
return selected if selected else list(FRAMEWORK_CHOICES)
-def run_mahout(num_qubits: int, total_batches: int, batch_size: int, prefetch:
int):
+def run_mahout(
+ num_qubits: int,
+ total_batches: int,
+ batch_size: int,
+ prefetch: int,
+ encoding_method: str = "amplitude",
+):
try:
engine = QdpEngine(0)
except Exception as exc:
@@ -134,9 +103,11 @@ def run_mahout(num_qubits: int, total_batches: int,
batch_size: int, prefetch: i
start = time.perf_counter()
processed = 0
- for batch in prefetched_batches(total_batches, batch_size, vector_len,
prefetch):
- normalized = normalize_batch(batch)
- qtensor = engine.encode(normalized, num_qubits, "amplitude")
+ for batch in prefetched_batches(
+ total_batches, batch_size, vector_len, prefetch, encoding_method
+ ):
+ normalized = normalize_batch(batch, encoding_method)
+ qtensor = engine.encode(normalized, num_qubits, encoding_method)
_ = torch.utils.dlpack.from_dlpack(qtensor)
processed += normalized.shape[0]
@@ -266,6 +237,13 @@ def main():
"(pennylane,qiskit-init,qiskit-statevector,mahout) or 'all'."
),
)
+ parser.add_argument(
+ "--encoding-method",
+ type=str,
+ default="amplitude",
+ choices=["amplitude", "basis"],
+ help="Encoding method to use for Mahout (amplitude or basis).",
+ )
args = parser.parse_args()
if not torch.cuda.is_available():
@@ -285,6 +263,7 @@ def main():
print(f" Batches : {args.batches}")
print(f" Prefetch : {args.prefetch}")
print(f" Frameworks : {', '.join(frameworks)}")
+ print(f" Encode method: {args.encoding_method}")
bytes_per_vec = vector_len * 8
print(f" Generated {total_vectors} samples")
print(
@@ -329,7 +308,11 @@ def main():
print()
print("[Mahout] Full Pipeline (DataLoader -> GPU)...")
t_mahout, l_mahout = run_mahout(
- args.qubits, args.batches, args.batch_size, args.prefetch
+ args.qubits,
+ args.batches,
+ args.batch_size,
+ args.prefetch,
+ args.encoding_method,
)
print()
diff --git a/qdp/qdp-python/benchmark/benchmark_latency_pytorch.py
b/qdp/qdp-python/benchmark/benchmark_latency_pytorch.py
index a0e7b1497..e1c842a8f 100755
--- a/qdp/qdp-python/benchmark/benchmark_latency_pytorch.py
+++ b/qdp/qdp-python/benchmark/benchmark_latency_pytorch.py
@@ -25,14 +25,13 @@ instead of NumPy arrays to test the zero-copy optimization.
from __future__ import annotations
import argparse
-import queue
-import threading
import time
import numpy as np
import torch
from _qdp import QdpEngine
+from utils import build_sample, normalize_batch_torch, prefetched_batches_torch
BAR = "=" * 70
SEP = "-" * 70
@@ -43,50 +42,12 @@ def sync_cuda() -> None:
torch.cuda.synchronize()
-def build_sample(seed: int, vector_len: int) -> np.ndarray:
- mask = np.uint64(vector_len - 1)
- scale = 1.0 / vector_len
- idx = np.arange(vector_len, dtype=np.uint64)
- mixed = (idx + np.uint64(seed)) & mask
- return mixed.astype(np.float64) * scale
-
-
-def prefetched_batches_torch(
- total_batches: int, batch_size: int, vector_len: int, prefetch: int
-):
- """Generate batches as PyTorch tensors."""
- q: queue.Queue[torch.Tensor | None] = queue.Queue(maxsize=prefetch)
-
- def producer():
- for batch_idx in range(total_batches):
- base = batch_idx * batch_size
- batch = [build_sample(base + i, vector_len) for i in
range(batch_size)]
- # Convert to PyTorch tensor (CPU, float64, contiguous)
- batch_tensor = torch.tensor(
- np.stack(batch), dtype=torch.float64, device="cpu"
- )
- assert batch_tensor.is_contiguous(), "Tensor should be contiguous"
- q.put(batch_tensor)
- q.put(None)
-
- threading.Thread(target=producer, daemon=True).start()
-
- while True:
- batch = q.get()
- if batch is None:
- break
- yield batch
-
-
-def normalize_batch_torch(batch: torch.Tensor) -> torch.Tensor:
- """Normalize PyTorch tensor batch."""
- norms = torch.norm(batch, dim=1, keepdim=True)
- norms = torch.clamp(norms, min=1e-10) # Avoid division by zero
- return batch / norms
-
-
def run_mahout_pytorch(
- num_qubits: int, total_batches: int, batch_size: int, prefetch: int
+ num_qubits: int,
+ total_batches: int,
+ batch_size: int,
+ prefetch: int,
+ encoding_method: str = "amplitude",
):
"""Run Mahout benchmark with PyTorch tensor input."""
try:
@@ -101,10 +62,10 @@ def run_mahout_pytorch(
processed = 0
for batch_tensor in prefetched_batches_torch(
- total_batches, batch_size, vector_len, prefetch
+ total_batches, batch_size, vector_len, prefetch, encoding_method
):
- normalized = normalize_batch_torch(batch_tensor)
- qtensor = engine.encode(normalized, num_qubits, "amplitude")
+ normalized = normalize_batch_torch(batch_tensor, encoding_method)
+ qtensor = engine.encode(normalized, num_qubits, encoding_method)
_ = torch.utils.dlpack.from_dlpack(qtensor)
processed += normalized.shape[0]
@@ -116,7 +77,11 @@ def run_mahout_pytorch(
def run_mahout_numpy(
- num_qubits: int, total_batches: int, batch_size: int, prefetch: int
+ num_qubits: int,
+ total_batches: int,
+ batch_size: int,
+ prefetch: int,
+ encoding_method: str = "amplitude",
):
"""Run Mahout benchmark with NumPy array input (for comparison)."""
try:
@@ -133,13 +98,19 @@ def run_mahout_numpy(
# Use the same data generation but keep as NumPy
for batch_idx in range(total_batches):
base = batch_idx * batch_size
- batch = [build_sample(base + i, vector_len) for i in range(batch_size)]
+ batch = [
+ build_sample(base + i, vector_len, encoding_method)
+ for i in range(batch_size)
+ ]
batch_np = np.stack(batch)
- norms = np.linalg.norm(batch_np, axis=1, keepdims=True)
- norms[norms == 0] = 1.0
- normalized = batch_np / norms
-
- qtensor = engine.encode(normalized, num_qubits, "amplitude")
+ if encoding_method == "basis":
+ normalized = batch_np
+ else:
+ norms = np.linalg.norm(batch_np, axis=1, keepdims=True)
+ norms[norms == 0] = 1.0
+ normalized = batch_np / norms
+
+ qtensor = engine.encode(normalized, num_qubits, encoding_method)
_ = torch.utils.dlpack.from_dlpack(qtensor)
processed += normalized.shape[0]
@@ -165,6 +136,13 @@ def main():
parser.add_argument(
"--prefetch", type=int, default=16, help="CPU-side prefetch depth."
)
+ parser.add_argument(
+ "--encoding-method",
+ type=str,
+ default="amplitude",
+ choices=["amplitude", "basis"],
+ help="Encoding method to use for Mahout (amplitude or basis).",
+ )
args = parser.parse_args()
if not torch.cuda.is_available():
@@ -191,13 +169,13 @@ def main():
print()
print("[Mahout-PyTorch] PyTorch Tensor Input (Zero-Copy Optimization)...")
t_pytorch, l_pytorch = run_mahout_pytorch(
- args.qubits, args.batches, args.batch_size, args.prefetch
+ args.qubits, args.batches, args.batch_size, args.prefetch,
args.encoding_method
)
print()
print("[Mahout-NumPy] NumPy Array Input (Baseline)...")
t_numpy, l_numpy = run_mahout_numpy(
- args.qubits, args.batches, args.batch_size, args.prefetch
+ args.qubits, args.batches, args.batch_size, args.prefetch,
args.encoding_method
)
print()
diff --git a/qdp/qdp-python/benchmark/benchmark_numpy_io.py
b/qdp/qdp-python/benchmark/benchmark_numpy_io.py
index 2a3f3774a..d79661b39 100644
--- a/qdp/qdp-python/benchmark/benchmark_numpy_io.py
+++ b/qdp/qdp-python/benchmark/benchmark_numpy_io.py
@@ -40,6 +40,7 @@ import numpy as np
import torch
from _qdp import QdpEngine
+from utils import normalize_batch
BAR = "=" * 70
SEP = "-" * 70
@@ -53,18 +54,27 @@ except ImportError:
def generate_test_data(
- num_samples: int, sample_size: int, seed: int = 42
+ num_samples: int,
+ sample_size: int,
+ encoding_method: str = "amplitude",
+ seed: int = 42,
) -> np.ndarray:
"""Generate deterministic test data."""
rng = np.random.RandomState(seed)
- data = rng.randn(num_samples, sample_size).astype(np.float64)
- # Normalize each sample
- norms = np.linalg.norm(data, axis=1, keepdims=True)
- norms[norms == 0] = 1.0
- return data / norms
+ if encoding_method == "basis":
+ # Basis encoding: single index per sample
+ data = rng.randint(0, sample_size, size=(num_samples,
1)).astype(np.float64)
+ else:
+ # Amplitude encoding: full vectors (using Gaussian distribution)
+ data = rng.randn(num_samples, sample_size).astype(np.float64)
+ # Normalize each sample
+ data = normalize_batch(data, encoding_method)
+ return data
-def run_mahout_numpy(num_qubits: int, num_samples: int, npy_path: str):
+def run_mahout_numpy(
+ num_qubits: int, num_samples: int, npy_path: str, encoding_method: str =
"amplitude"
+):
"""Benchmark Mahout with NumPy file I/O."""
print("\n[Mahout + NumPy] Loading and encoding...")
@@ -80,7 +90,7 @@ def run_mahout_numpy(num_qubits: int, num_samples: int,
npy_path: str):
try:
# Use the unified encode API with file path
- qtensor = engine.encode(npy_path, num_qubits, "amplitude")
+ qtensor = engine.encode(npy_path, num_qubits, encoding_method)
tensor = torch.utils.dlpack.from_dlpack(qtensor)
# Small computation to ensure GPU has processed the data
@@ -181,6 +191,13 @@ def main():
default="all",
help="Comma-separated list: mahout,pennylane or 'all'",
)
+ parser.add_argument(
+ "--encoding-method",
+ type=str,
+ default="amplitude",
+ choices=["amplitude", "basis"],
+ help="Encoding method to use for Mahout (amplitude or basis).",
+ )
args = parser.parse_args()
# Parse frameworks
@@ -204,7 +221,7 @@ def main():
# Generate test data
print("\nGenerating test data...")
- data = generate_test_data(num_samples, sample_size)
+ data = generate_test_data(num_samples, sample_size, args.encoding_method)
# Save to NumPy file
if args.output:
@@ -223,7 +240,7 @@ def main():
if "mahout" in frameworks:
t_total, throughput, avg_per_sample = run_mahout_numpy(
- num_qubits, num_samples, npy_path
+ num_qubits, num_samples, npy_path, args.encoding_method
)
if throughput > 0:
results["Mahout"] = {
diff --git a/qdp/qdp-python/benchmark/benchmark_throughput.py
b/qdp/qdp-python/benchmark/benchmark_throughput.py
index 54b04fd1f..8c0305402 100644
--- a/qdp/qdp-python/benchmark/benchmark_throughput.py
+++ b/qdp/qdp-python/benchmark/benchmark_throughput.py
@@ -28,14 +28,13 @@ Run:
"""
import argparse
-import queue
-import threading
import time
import numpy as np
import torch
from _qdp import QdpEngine
+from utils import normalize_batch, prefetched_batches
BAR = "=" * 70
SEP = "-" * 70
@@ -57,41 +56,6 @@ except ImportError:
HAS_QISKIT = False
-def build_sample(seed: int, vector_len: int) -> np.ndarray:
- mask = np.uint64(vector_len - 1)
- scale = 1.0 / vector_len
- idx = np.arange(vector_len, dtype=np.uint64)
- mixed = (idx + np.uint64(seed)) & mask
- return mixed.astype(np.float64) * scale
-
-
-def prefetched_batches(
- total_batches: int, batch_size: int, vector_len: int, prefetch: int
-):
- q: queue.Queue[np.ndarray | None] = queue.Queue(maxsize=prefetch)
-
- def producer():
- for batch_idx in range(total_batches):
- base = batch_idx * batch_size
- batch = [build_sample(base + i, vector_len) for i in
range(batch_size)]
- q.put(np.stack(batch))
- q.put(None)
-
- threading.Thread(target=producer, daemon=True).start()
-
- while True:
- batch = q.get()
- if batch is None:
- break
- yield batch
-
-
-def normalize_batch(batch: np.ndarray) -> np.ndarray:
- norms = np.linalg.norm(batch, axis=1, keepdims=True)
- norms[norms == 0] = 1.0
- return batch / norms
-
-
def parse_frameworks(raw: str) -> list[str]:
if raw.lower() == "all":
return list(FRAMEWORK_CHOICES)
@@ -112,7 +76,13 @@ def parse_frameworks(raw: str) -> list[str]:
return selected if selected else list(FRAMEWORK_CHOICES)
-def run_mahout(num_qubits: int, total_batches: int, batch_size: int, prefetch:
int):
+def run_mahout(
+ num_qubits: int,
+ total_batches: int,
+ batch_size: int,
+ prefetch: int,
+ encoding_method: str = "amplitude",
+):
try:
engine = QdpEngine(0)
except Exception as exc:
@@ -124,10 +94,12 @@ def run_mahout(num_qubits: int, total_batches: int,
batch_size: int, prefetch: i
processed = 0
for batch in prefetched_batches(
- total_batches, batch_size, 1 << num_qubits, prefetch
+ total_batches, batch_size, 1 << num_qubits, prefetch, encoding_method
):
- normalized = np.ascontiguousarray(normalize_batch(batch),
dtype=np.float64)
- qtensor = engine.encode(normalized, num_qubits)
+ normalized = np.ascontiguousarray(
+ normalize_batch(batch, encoding_method), dtype=np.float64
+ )
+ qtensor = engine.encode(normalized, num_qubits, encoding_method)
tensor = torch.from_dlpack(qtensor).abs().to(torch.float32)
_ = tensor.sum()
processed += normalized.shape[0]
@@ -240,6 +212,13 @@ def main():
"(pennylane,qiskit,mahout) or 'all'."
),
)
+ parser.add_argument(
+ "--encoding-method",
+ type=str,
+ default="amplitude",
+ choices=["amplitude", "basis"],
+ help="Encoding method to use for Mahout (amplitude or basis).",
+ )
args = parser.parse_args()
try:
@@ -256,6 +235,7 @@ def main():
print(f" Batches : {args.batches}")
print(f" Prefetch : {args.prefetch}")
print(f" Frameworks : {', '.join(frameworks)}")
+ print(f" Encode method: {args.encoding_method}")
bytes_per_vec = vector_len * 8
print(f" Generated {total_vectors} samples")
print(
@@ -290,7 +270,11 @@ def main():
print()
print("[Mahout] Full Pipeline (DataLoader -> GPU)...")
t_mahout, th_mahout = run_mahout(
- args.qubits, args.batches, args.batch_size, args.prefetch
+ args.qubits,
+ args.batches,
+ args.batch_size,
+ args.prefetch,
+ args.encoding_method,
)
print()
diff --git a/qdp/qdp-python/benchmark/utils.py
b/qdp/qdp-python/benchmark/utils.py
new file mode 100644
index 000000000..753405831
--- /dev/null
+++ b/qdp/qdp-python/benchmark/utils.py
@@ -0,0 +1,222 @@
+#!/usr/bin/env python3
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Shared utility functions for QDP benchmarks.
+
+This module provides common data generation and normalization functions
+used across multiple benchmark scripts.
+"""
+
+from __future__ import annotations
+
+import queue
+import threading
+
+import numpy as np
+import torch
+
+
+def build_sample(
+ seed: int, vector_len: int, encoding_method: str = "amplitude"
+) -> np.ndarray:
+ """
+ Build a single sample vector for benchmarking.
+
+ Args:
+ seed: Seed value used to generate deterministic data.
+ vector_len: Length of the vector (2^num_qubits for amplitude encoding).
+ encoding_method: Either "amplitude" or "basis".
+
+ Returns:
+ NumPy array containing the sample data.
+ """
+ if encoding_method == "basis":
+ # Basis encoding: single index per sample
+ mask = np.uint64(vector_len - 1)
+ idx = np.uint64(seed) & mask
+ return np.array([idx], dtype=np.float64)
+ else:
+ # Amplitude encoding: full vector
+ mask = np.uint64(vector_len - 1)
+ scale = 1.0 / vector_len
+ idx = np.arange(vector_len, dtype=np.uint64)
+ mixed = (idx + np.uint64(seed)) & mask
+ return mixed.astype(np.float64) * scale
+
+
+def generate_batch_data(
+ n_samples: int,
+ dim: int,
+ encoding_method: str = "amplitude",
+ seed: int = 42,
+) -> np.ndarray:
+ """
+ Generate batch data for benchmarking.
+
+ Args:
+ n_samples: Number of samples to generate.
+ dim: Dimension of each sample (2^num_qubits for amplitude encoding).
+ encoding_method: Either "amplitude" or "basis".
+ seed: Random seed for reproducibility.
+
+ Returns:
+ NumPy array of shape (n_samples, dim) for amplitude encoding
+ or (n_samples, 1) for basis encoding.
+ """
+ np.random.seed(seed)
+ if encoding_method == "basis":
+ # Basis encoding: single index per sample
+ return np.random.randint(0, dim, size=(n_samples,
1)).astype(np.float64)
+ else:
+ # Amplitude encoding: full vectors
+ return np.random.rand(n_samples, dim).astype(np.float64)
+
+
+def normalize_batch(
+ batch: np.ndarray, encoding_method: str = "amplitude"
+) -> np.ndarray:
+ """
+ Normalize a batch of vectors (L2 normalization).
+
+ Args:
+ batch: NumPy array of shape (batch_size, vector_len).
+ encoding_method: Either "amplitude" or "basis".
+
+ Returns:
+ Normalized batch. For basis encoding, returns the input unchanged.
+ """
+ if encoding_method == "basis":
+ # Basis encoding doesn't need normalization (indices)
+ return batch
+ # Amplitude encoding: normalize vectors
+ norms = np.linalg.norm(batch, axis=1, keepdims=True)
+ norms[norms == 0] = 1.0
+ return batch / norms
+
+
+def normalize_batch_torch(
+ batch: torch.Tensor, encoding_method: str = "amplitude"
+) -> torch.Tensor:
+ """
+ Normalize a batch of PyTorch tensors (L2 normalization).
+
+ Args:
+ batch: PyTorch tensor of shape (batch_size, vector_len).
+ encoding_method: Either "amplitude" or "basis".
+
+ Returns:
+ Normalized batch. For basis encoding, returns the input unchanged.
+ """
+ if encoding_method == "basis":
+ # Basis encoding doesn't need normalization (indices)
+ return batch
+ # Amplitude encoding: normalize vectors
+ norms = torch.norm(batch, dim=1, keepdim=True)
+ norms = torch.clamp(norms, min=1e-10) # Avoid division by zero
+ return batch / norms
+
+
+def prefetched_batches(
+ total_batches: int,
+ batch_size: int,
+ vector_len: int,
+ prefetch: int,
+ encoding_method: str = "amplitude",
+):
+ """
+ Generate prefetched batches of NumPy arrays for benchmarking.
+
+ Uses a background thread to prefetch batches and keep the GPU fed.
+
+ Args:
+ total_batches: Total number of batches to generate.
+ batch_size: Number of samples per batch.
+ vector_len: Length of each vector (2^num_qubits).
+ prefetch: Number of batches to prefetch.
+ encoding_method: Either "amplitude" or "basis".
+
+ Yields:
+ NumPy arrays of shape (batch_size, vector_len) or (batch_size, 1).
+ """
+ q: queue.Queue[np.ndarray | None] = queue.Queue(maxsize=prefetch)
+
+ def producer():
+ for batch_idx in range(total_batches):
+ base = batch_idx * batch_size
+ batch = [
+ build_sample(base + i, vector_len, encoding_method)
+ for i in range(batch_size)
+ ]
+ q.put(np.stack(batch))
+ q.put(None)
+
+ threading.Thread(target=producer, daemon=True).start()
+
+ while True:
+ batch = q.get()
+ if batch is None:
+ break
+ yield batch
+
+
+def prefetched_batches_torch(
+ total_batches: int,
+ batch_size: int,
+ vector_len: int,
+ prefetch: int,
+ encoding_method: str = "amplitude",
+):
+ """
+ Generate prefetched batches as PyTorch tensors for benchmarking.
+
+ Uses a background thread to prefetch batches and keep the GPU fed.
+
+ Args:
+ total_batches: Total number of batches to generate.
+ batch_size: Number of samples per batch.
+ vector_len: Length of each vector (2^num_qubits).
+ prefetch: Number of batches to prefetch.
+ encoding_method: Either "amplitude" or "basis".
+
+ Yields:
+ PyTorch tensors of shape (batch_size, vector_len) or (batch_size, 1).
+ """
+ q: queue.Queue[torch.Tensor | None] = queue.Queue(maxsize=prefetch)
+
+ def producer():
+ for batch_idx in range(total_batches):
+ base = batch_idx * batch_size
+ batch = [
+ build_sample(base + i, vector_len, encoding_method)
+ for i in range(batch_size)
+ ]
+ # Convert to PyTorch tensor (CPU, float64, contiguous)
+ batch_tensor = torch.tensor(
+ np.stack(batch), dtype=torch.float64, device="cpu"
+ )
+ assert batch_tensor.is_contiguous(), "Tensor should be contiguous"
+ q.put(batch_tensor)
+ q.put(None)
+
+ threading.Thread(target=producer, daemon=True).start()
+
+ while True:
+ batch = q.get()
+ if batch is None:
+ break
+ yield batch