This is an automated email from the ASF dual-hosted git repository.
hcr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/mahout.git
The following commit(s) were added to refs/heads/main by this push:
new e4c3e5804 [QDP] Add observability tools to diagnose pipeline
performance (#945)
e4c3e5804 is described below
commit e4c3e580415d677c505c740594fa2c1a2a8e963e
Author: KUAN-HAO HUANG <[email protected]>
AuthorDate: Wed Jan 28 22:43:48 2026 +0800
[QDP] Add observability tools to diagnose pipeline performance (#945)
* Add Observability Tools
* update and do more safe way
---
qdp/Cargo.lock | 124 +++++++
qdp/docs/observability/OBSERVABILITY_USAGE.md | 471 ++++++++++++++++++++++++++
qdp/qdp-core/Cargo.toml | 2 +
qdp/qdp-core/examples/observability_test.rs | 120 +++++++
qdp/qdp-core/src/error.rs | 1 +
qdp/qdp-core/src/gpu/buffer_pool.rs | 43 ++-
qdp/qdp-core/src/gpu/cuda_ffi.rs | 30 ++
qdp/qdp-core/src/gpu/mod.rs | 8 +
qdp/qdp-core/src/gpu/overlap_tracker.rs | 465 +++++++++++++++++++++++++
qdp/qdp-core/src/gpu/pipeline.rs | 207 ++++++++---
qdp/qdp-core/src/gpu/pool_metrics.rs | 242 +++++++++++++
qdp/qdp-python/Cargo.toml | 1 +
qdp/qdp-python/src/lib.rs | 6 +
13 files changed, 1677 insertions(+), 43 deletions(-)
diff --git a/qdp/Cargo.lock b/qdp/Cargo.lock
index dc937c4ea..9b6e5bd14 100644
--- a/qdp/Cargo.lock
+++ b/qdp/Cargo.lock
@@ -66,6 +66,56 @@ dependencies = [
"libc",
]
+[[package]]
+name = "anstream"
+version = "0.6.21"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "43d5b281e737544384e969a5ccad3f1cdd24b48086a0fc1b2a5262a26b8f4f4a"
+dependencies = [
+ "anstyle",
+ "anstyle-parse",
+ "anstyle-query",
+ "anstyle-wincon",
+ "colorchoice",
+ "is_terminal_polyfill",
+ "utf8parse",
+]
+
+[[package]]
+name = "anstyle"
+version = "1.0.13"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "5192cca8006f1fd4f7237516f40fa183bb07f8fbdfedaa0036de5ea9b0b45e78"
+
+[[package]]
+name = "anstyle-parse"
+version = "0.2.7"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "4e7644824f0aa2c7b9384579234ef10eb7efb6a0deb83f9630a49594dd9c15c2"
+dependencies = [
+ "utf8parse",
+]
+
+[[package]]
+name = "anstyle-query"
+version = "1.1.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "40c48f72fd53cd289104fc64099abca73db4166ad86ea0b4341abe65af83dadc"
+dependencies = [
+ "windows-sys",
+]
+
+[[package]]
+name = "anstyle-wincon"
+version = "3.0.11"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "291e6a250ff86cd4a820112fb8898808a366d8f9f58ce16d1f538353ad55747d"
+dependencies = [
+ "anstyle",
+ "once_cell_polyfill",
+ "windows-sys",
+]
+
[[package]]
name = "anyhow"
version = "1.0.100"
@@ -444,6 +494,12 @@ dependencies = [
"inout",
]
+[[package]]
+name = "colorchoice"
+version = "1.0.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "b05b61dc5112cbb17e4b6cd61790d9845d13888356391624cbe7e41efeac1e75"
+
[[package]]
name = "const-random"
version = "0.1.18"
@@ -613,6 +669,29 @@ version = "1.15.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719"
+[[package]]
+name = "env_filter"
+version = "0.1.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "1bf3c259d255ca70051b30e2e95b5446cdb8949ac4cd22c0d7fd634d89f568e2"
+dependencies = [
+ "log",
+ "regex",
+]
+
+[[package]]
+name = "env_logger"
+version = "0.11.8"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "13c863f0904021b108aa8b2f55046443e6b1ebde8fd4a15c399893aae4fa069f"
+dependencies = [
+ "anstream",
+ "anstyle",
+ "env_filter",
+ "jiff",
+ "log",
+]
+
[[package]]
name = "equivalent"
version = "1.0.2"
@@ -797,6 +876,12 @@ version = "3.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8bb03732005da905c88227371639bf1ad885cc712789c011c31c5fb3ab3ccf02"
+[[package]]
+name = "is_terminal_polyfill"
+version = "1.70.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "a6cb138bb79a146c1bd460005623e142ef0181e3d0219cb493e02f7d08a35695"
+
[[package]]
name = "itertools"
version = "0.12.1"
@@ -812,6 +897,30 @@ version = "1.0.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "92ecc6618181def0457392ccd0ee51198e065e016d1d527a7ac1b6dc7c1f09d2"
+[[package]]
+name = "jiff"
+version = "0.2.18"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e67e8da4c49d6d9909fe03361f9b620f58898859f5c7aded68351e85e71ecf50"
+dependencies = [
+ "jiff-static",
+ "log",
+ "portable-atomic",
+ "portable-atomic-util",
+ "serde_core",
+]
+
+[[package]]
+name = "jiff-static"
+version = "0.2.18"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e0c84ee7f197eca9a86c6fd6cb771e55eb991632f15f2bc3ca6ec838929e6e78"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn",
+]
+
[[package]]
name = "jobserver"
version = "0.1.34"
@@ -1134,6 +1243,12 @@ version = "1.21.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "42f5e15c9953c5e4ccceeb2e7382a716482c34515315f7b03532b8b4e8393d2d"
+[[package]]
+name = "once_cell_polyfill"
+version = "1.70.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "384b8ab6d37215f3c5301a95a4accb5d64aa607f1fcb26a11b5303878451b4fe"
+
[[package]]
name = "ordered-float"
version = "2.10.1"
@@ -1512,6 +1627,8 @@ dependencies = [
"arrow",
"bytes",
"cudarc",
+ "env_logger",
+ "log",
"ndarray 0.16.1",
"ndarray-npy",
"nvtx",
@@ -1537,6 +1654,7 @@ dependencies = [
name = "qdp-python"
version = "0.1.0"
dependencies = [
+ "env_logger",
"numpy",
"pyo3",
"qdp-core",
@@ -1983,6 +2101,12 @@ version = "0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7264e107f553ccae879d21fbea1d6724ac785e8c3bfc762137959b5802826ef3"
+[[package]]
+name = "utf8parse"
+version = "0.2.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821"
+
[[package]]
name = "version_check"
version = "0.9.5"
diff --git a/qdp/docs/observability/OBSERVABILITY_USAGE.md
b/qdp/docs/observability/OBSERVABILITY_USAGE.md
new file mode 100644
index 000000000..6677226d3
--- /dev/null
+++ b/qdp/docs/observability/OBSERVABILITY_USAGE.md
@@ -0,0 +1,471 @@
+# Observability Tools Usage Guide
+
+This document describes how to use the observability tools to diagnose
pipeline performance and verify optimization targets.
+
+**Note for Python Users**: The Python bindings automatically initialize Rust
logging when the module is imported. You only need to set the `RUST_LOG`
environment variable before importing `qumat.qdp`. See the [Logging
Configuration](#logging-configuration) section for details.
+
+## Table of Contents
+
+- [Overview](#overview)
+- [Enabling Observability](#enabling-observability)
+- [Logging Configuration](#logging-configuration)
+- [Pool Metrics](#pool-metrics)
+- [Overlap Tracking](#overlap-tracking)
+- [Performance Impact](#performance-impact)
+- [Usage Examples](#usage-examples)
+- [Integration with Benchmarking](#integration-with-benchmarking)
+- [Troubleshooting](#troubleshooting)
+- [API Reference](#api-reference)
+- [Related Documentation](#related-documentation)
+
+## Overview
+
+The observability tools provide two main features:
+
+1. **Pool Metrics**: Track pinned buffer pool utilization to diagnose pool
starvation
+2. **Overlap Tracking**: Measure H2D copy and compute overlap to verify >60%
overlap target
+
+Both features are **optional** and can be enabled via environment variables.
When disabled, they have **zero performance overhead**.
+
+**Important**: Observability features are only active when the dual-stream
pipeline is used. The pipeline is automatically used for data sizes >= 1MB
(131,072 elements for float64, which corresponds to 17 qubits or more).
+
+## Enabling Observability
+
+### Pool Metrics
+
+Enable pool utilization metrics by setting the `QDP_ENABLE_POOL_METRICS`
environment variable:
+
+```bash
+export QDP_ENABLE_POOL_METRICS=1
+# or
+export QDP_ENABLE_POOL_METRICS=true
+```
+
+### Overlap Tracking
+
+Enable H2D overlap tracking by setting the `QDP_ENABLE_OVERLAP_TRACKING`
environment variable:
+
+```bash
+export QDP_ENABLE_OVERLAP_TRACKING=1
+# or
+export QDP_ENABLE_OVERLAP_TRACKING=true
+```
+
+### Both Features
+
+You can enable both features simultaneously:
+
+```bash
+export QDP_ENABLE_POOL_METRICS=1
+export QDP_ENABLE_OVERLAP_TRACKING=1
+```
+
+## Logging Configuration
+
+The observability tools use the Rust `log` crate. To see the output, you need
to configure a logger. The Python bindings automatically initialize
`env_logger` when the module is imported, so you only need to set the
`RUST_LOG` environment variable.
+
+### Using Python Bindings (Recommended for Python Users)
+
+The Python bindings (`qumat.qdp`) automatically initialize the Rust logging
system when the module is imported. Simply set the `RUST_LOG` environment
variable before importing:
+
+```python
+import os
+
+# Set environment variables BEFORE importing qumat.qdp
+os.environ['RUST_LOG'] = 'info' # or 'debug' for more detailed output
+os.environ['QDP_ENABLE_POOL_METRICS'] = '1'
+os.environ['QDP_ENABLE_OVERLAP_TRACKING'] = '1'
+
+# Now import - logging will be initialized automatically
+from qumat import qdp
+import numpy as np
+
+# Your QDP code here
+# Note: Use >= 18 qubits (2MB) to ensure pipeline is triggered
+engine = qdp.QdpEngine(0)
+data = np.random.rand(262144).astype(np.float64) # 18 qubits = 262144
elements = 2MB
+data = data / np.linalg.norm(data)
+result = engine.encode(data, num_qubits=18, encoding_method='amplitude')
+```
+
+Or set environment variables in your shell:
+
+```bash
+export RUST_LOG=info
+export QDP_ENABLE_POOL_METRICS=1
+export QDP_ENABLE_OVERLAP_TRACKING=1
+
+python your_script.py
+```
+
+**Note**: The Python `logging` module is separate from Rust's logging system.
Rust log messages will appear in stderr, not through Python's logging system.
+
+### Using Rust Examples
+
+For Rust examples, you need to manually initialize `env_logger`:
+
+```rust
+// In your example main function:
+env_logger::Builder::from_default_env()
+ .init(); // Don't override filter_level - let RUST_LOG control it
+```
+
+Then run with logging enabled:
+
+```bash
+RUST_LOG=info cargo run --example observability_test --release
+```
+
+## Pool Metrics
+
+### What It Tracks
+
+- **min_available**: Minimum number of buffers available during any acquire
operation
+- **max_available**: Maximum number of buffers available during any acquire
operation
+- **total_acquires**: Total number of buffer acquire operations
+- **total_waits**: Number of times a thread had to wait because the pool was
empty
+- **starvation_ratio**: Ratio of waits to acquires (indicates pool starvation)
+- **avg_wait_time_ns**: Average wait time in nanoseconds
+
+### Interpreting Results
+
+**Good Performance**:
+- `starvation_ratio < 0.05` (less than 5% of acquires had to wait)
+- `avg_wait_time_ns` is small (< 1ms typically)
+
+**Pool Starvation Detected**:
+- `starvation_ratio > 0.05` (more than 5% of acquires had to wait)
+- The tool will automatically log a warning
+- **Action**: Consider increasing `QDP_PINNED_POOL_SIZE` (when implemented in
future PR)
+
+### Example Output
+
+```
+[INFO] Pool Utilization: min=0, max=2, acquires=100, waits=2, starvation=2.00%
+```
+
+If starvation is detected:
+
+```
+[WARN] Pool starvation detected: 2.0% of acquires had to wait. Consider
increasing pool size.
+```
+
+## Overlap Tracking
+
+### What It Tracks
+
+- **H2D Overlap Ratio**: Percentage of time that copy and compute operations
overlap
+- Measured per chunk (logged for chunk 0 and every 10th chunk to avoid
excessive output)
+
+### Interpreting Results
+
+**Target**: H2D overlap should be **>60%** for optimal performance.
+
+**Good Performance**:
+- Overlap > 60%: Pipeline is efficiently overlapping copy and compute
+- Overlap percentage is logged at `INFO` level
+- Detailed timing information (copy time, compute time, overlap time) is
logged at `DEBUG` level
+
+**Below Target**:
+- Overlap < 60%: Pipeline is not achieving optimal overlap
+- Overlap percentage is logged at `INFO` level
+- A warning message is logged at `WARN` level
+- Detailed timing information is available at `DEBUG` level for troubleshooting
+- **Possible causes**:
+ - Regular synchronization points (will be addressed in future PR)
+ - Pool size too small
+ - Chunk size not optimal for hardware
+
+### Example Output
+
+**Good Overlap**:
+```
+[INFO] Chunk 0: H2D overlap = 68.5%
+[INFO] Chunk 10: H2D overlap = 72.3%
+```
+
+**Below Target**:
+```
+[INFO] Chunk 0: H2D overlap = 45.2%
+[WARN] Chunk 0: Overlap below target (60%), current = 45.2%
+```
+
+**With DEBUG level** (shows detailed timing information):
+```
+[INFO] Chunk 0: H2D overlap = 68.5%
+[DEBUG] Chunk 0: H2D overlap details - copy=1.230ms, compute=2.450ms,
overlap=0.840ms, ratio=68.5%
+```
+
+## Performance Impact
+
+### When Disabled
+
+- **Zero overhead**: All observability code is conditionally compiled or uses
fast checks
+- No CUDA events created
+- No atomic operations performed
+- Safe to leave in production code
+
+### When Enabled
+
+- **Pool Metrics**: < 1% CPU overhead (atomic operations with Relaxed ordering)
+- **Overlap Tracking**: < 5% CPU overhead in debug mode (CUDA event queries)
+- **Combined**: < 5% CPU overhead when both enabled
+
+## Usage Examples
+
+### Example 1: Using the Rust Test Example
+
+A test example is provided to demonstrate observability features:
+
+```bash
+# Build the example
+# Note: Run from the qdp directory (or use: cd qdp)
+cargo build -p qdp-core --example observability_test --release
+
+# Run without observability (baseline)
+./target/release/examples/observability_test
+
+# Run with pool metrics only
+QDP_ENABLE_POOL_METRICS=1 RUST_LOG=info
./target/release/examples/observability_test
+
+# Run with overlap tracking only
+QDP_ENABLE_OVERLAP_TRACKING=1 RUST_LOG=info
./target/release/examples/observability_test
+
+# Run with both features
+QDP_ENABLE_POOL_METRICS=1 QDP_ENABLE_OVERLAP_TRACKING=1 RUST_LOG=info
./target/release/examples/observability_test
+```
+
+### Example 2: Diagnose Pool Starvation (Python)
+
+```python
+import os
+os.environ['QDP_ENABLE_POOL_METRICS'] = '1'
+os.environ['RUST_LOG'] = 'info'
+
+from qumat import qdp
+import numpy as np
+
+engine = qdp.QdpEngine(0)
+# Create data that triggers pipeline (>= 1MB = 131072 elements = 17 qubits)
+# Using 18 qubits (262144 elements = 2MB) to ensure pipeline is used
+data = np.random.rand(262144).astype(np.float64)
+data = data / np.linalg.norm(data)
+result = engine.encode(data, num_qubits=18, encoding_method='amplitude')
+# Check stderr for starvation warnings
+```
+
+Or using shell environment variables:
+
+```bash
+# Enable pool metrics
+export QDP_ENABLE_POOL_METRICS=1
+export RUST_LOG=info
+
+# Run your workload
+python your_script.py
+
+# Check stderr output for starvation warnings
+```
+
+### Example 3: Verify Overlap Target (Python)
+
+```python
+import os
+os.environ['QDP_ENABLE_OVERLAP_TRACKING'] = '1'
+os.environ['RUST_LOG'] = 'info' # or 'debug' for detailed timing
+
+from qumat import qdp
+import numpy as np
+
+engine = qdp.QdpEngine(0)
+# Create data that triggers pipeline (>= 1MB = 131072 elements = 17 qubits)
+# Using 18 qubits (262144 elements = 2MB) to ensure pipeline is used
+data = np.random.rand(262144).astype(np.float64)
+data = data / np.linalg.norm(data)
+result = engine.encode(data, num_qubits=18, encoding_method='amplitude')
+# Check stderr for overlap percentages
+```
+
+Or using shell environment variables:
+
+```bash
+# Enable overlap tracking
+export QDP_ENABLE_OVERLAP_TRACKING=1
+export RUST_LOG=info # or debug for detailed timing
+
+# Run your workload
+python your_script.py
+
+# Check stderr output for overlap percentages
+```
+
+### Example 4: Full Observability (Python)
+
+```python
+import os
+os.environ['QDP_ENABLE_POOL_METRICS'] = '1'
+os.environ['QDP_ENABLE_OVERLAP_TRACKING'] = '1'
+os.environ['RUST_LOG'] = 'info' # or 'debug' for more details
+
+from qumat import qdp
+import numpy as np
+
+engine = qdp.QdpEngine(0)
+# Create data that triggers pipeline (>= 1MB = 131072 elements = 17 qubits)
+# Using 18 qubits (262144 elements = 2MB) to ensure pipeline is used
+data = np.random.rand(262144).astype(np.float64)
+data = data / np.linalg.norm(data)
+result = engine.encode(data, num_qubits=18, encoding_method='amplitude')
+# Check stderr for both metrics
+```
+
+Or using shell environment variables:
+
+```bash
+# Enable both features
+export QDP_ENABLE_POOL_METRICS=1
+export QDP_ENABLE_OVERLAP_TRACKING=1
+export RUST_LOG=info # or debug for more details
+
+# Run your workload
+python your_script.py
+
+# Check stderr output for both pool utilization and overlap
+```
+
+## Integration with Benchmarking
+
+When running benchmarks, enable observability to collect data:
+
+**Python Benchmarks**:
+```bash
+# Enable observability
+export QDP_ENABLE_POOL_METRICS=1
+export QDP_ENABLE_OVERLAP_TRACKING=1
+export RUST_LOG=info
+
+# Run throughput benchmark (example - adjust path and arguments as needed)
+python benchmark_throughput.py --qubits 16 --batches 200 --batch-size 64
--frameworks mahout
+
+# Check stderr output for:
+# - Pool utilization summary
+# - Overlap percentages
+# - Any warnings about starvation or low overlap
+```
+
+**Rust Benchmarks**:
+```bash
+# Enable observability
+export QDP_ENABLE_POOL_METRICS=1
+export QDP_ENABLE_OVERLAP_TRACKING=1
+export RUST_LOG=info
+
+# Run benchmark (ensure env_logger is initialized in your benchmark code)
+# Replace 'your_benchmark' with the actual benchmark binary name
+cargo run --release --bin your_benchmark
+```
+
+## Troubleshooting
+
+### No Log Output
+
+**Problem**: Enabled observability but see no log output.
+
+**For Python Users**:
+1. **Set `RUST_LOG` environment variable** before importing `qumat.qdp`:
+ ```python
+ import os
+ os.environ['RUST_LOG'] = 'info' # Must be set BEFORE import
+ from qumat import qdp
+ ```
+ Or set it in your shell:
+ ```bash
+ export RUST_LOG=info
+ python your_script.py
+ ```
+2. **Check stderr, not stdout**: Rust log messages go to stderr, not stdout
+3. **Verify environment variables are set**: Check that
`QDP_ENABLE_POOL_METRICS` and/or `QDP_ENABLE_OVERLAP_TRACKING` are set to `1`
or `true`
+4. **Ensure pipeline is used**: Observability is only active when the
dual-stream pipeline is used (data size >= 1MB threshold, which is 131,072
elements for float64, corresponding to 17 qubits or more)
+
+**For Rust Users**:
+1. Ensure `env_logger` is initialized in your code:
+ ```rust
+ env_logger::Builder::from_default_env().init();
+ ```
+2. Set `RUST_LOG=debug` or `RUST_LOG=info` when running
+3. Check that the code path actually uses the pipeline (observability is only
active in `run_dual_stream_pipeline`)
+
+### Overlap Always 0%
+
+**Problem**: Overlap tracking shows 0% overlap.
+
+**Possible Causes**:
+1. Events not being recorded (check that `record_copy_start/end` and
`record_compute_start/end` are called)
+2. Events not completing (check for CUDA errors)
+3. Compute and copy not actually overlapping (expected in some scenarios)
+
+**Debug Steps**:
+1. Enable debug logging: `RUST_LOG=debug`
+2. Check for CUDA errors in the logs
+3. Verify that both copy and compute operations are actually running
+
+### High Starvation Ratio
+
+**Problem**: Pool starvation ratio > 5%.
+
+**Solutions**:
+1. **Immediate**: This indicates the pool size may be too small
+2. **Future**: When `QDP_PINNED_POOL_SIZE` environment variable is
implemented, increase it
+3. **Workaround**: For now, this is expected behavior with the current fixed
pool size of 2
+
+## API Reference
+
+### PoolMetrics
+
+```rust
+use qdp_core::gpu::PoolMetrics;
+
+// Create metrics
+let metrics = PoolMetrics::new();
+
+// Record operations (automatically done by PinnedBufferPool when enabled)
+// ...
+
+// Generate report
+let report = metrics.report();
+report.print_summary();
+
+// Reset for new measurement period
+metrics.reset();
+```
+
+### OverlapTracker
+
+```rust
+use qdp_core::gpu::OverlapTracker;
+
+// Create tracker (usually done automatically by pipeline)
+let tracker = OverlapTracker::new(pool_size, enabled)?;
+
+// Record events (automatically done by pipeline when enabled)
+tracker.record_copy_start(stream, slot)?;
+tracker.record_copy_end(stream, slot)?;
+tracker.record_compute_start(stream, slot)?;
+tracker.record_compute_end(stream, slot)?;
+
+// Calculate and log overlap
+tracker.log_overlap(chunk_idx)?;
+```
+
+## Related Documentation
+
+- [NVTX Usage Guide](./NVTX_USAGE.md) - NVTX profiling markers
+
+## Future Enhancements
+
+Planned improvements in future PRs:
+
+1. **Dynamic Pool Size**: `QDP_PINNED_POOL_SIZE` environment variable to
adjust pool size
+2. **More Detailed Metrics**: Per-chunk timing breakdowns
+3. **Export to Metrics Format**: Export metrics to Prometheus/StatsD format
+4. **Real-time Monitoring**: Web dashboard for live metrics
diff --git a/qdp/qdp-core/Cargo.toml b/qdp/qdp-core/Cargo.toml
index 0353554b8..407ed2ba4 100644
--- a/qdp/qdp-core/Cargo.toml
+++ b/qdp/qdp-core/Cargo.toml
@@ -16,6 +16,7 @@ ndarray-npy = { workspace = true }
prost = { workspace = true }
bytes = { workspace = true }
tch = { workspace = true, optional = true }
+log = "0.4"
[build-dependencies]
prost-build = { workspace = true }
@@ -31,3 +32,4 @@ pytorch = ["tch"]
[dev-dependencies]
approx = "0.5.1"
+env_logger = "0.11"
diff --git a/qdp/qdp-core/examples/observability_test.rs
b/qdp/qdp-core/examples/observability_test.rs
new file mode 100644
index 000000000..462e8aef7
--- /dev/null
+++ b/qdp/qdp-core/examples/observability_test.rs
@@ -0,0 +1,120 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Observability test example
+// Tests pool metrics and overlap tracking features
+// Run: cargo run -p qdp-core --example observability_test --release
+
+use qdp_core::QdpEngine;
+use std::env;
+
+fn main() {
+ // Initialize logger - respect RUST_LOG environment variable
+ // Don't override the filter level, let RUST_LOG control it
+ env_logger::Builder::from_default_env().init();
+
+ println!("=== QDP Observability Test ===");
+ println!();
+
+ // Check environment variables
+ let enable_pool_metrics = env::var("QDP_ENABLE_POOL_METRICS")
+ .map(|s| s == "1" || s.eq_ignore_ascii_case("true"))
+ .unwrap_or(false);
+ let enable_overlap_tracking = env::var("QDP_ENABLE_OVERLAP_TRACKING")
+ .map(|s| s == "1" || s.eq_ignore_ascii_case("true"))
+ .unwrap_or(false);
+
+ println!("Observability Configuration:");
+ println!(" QDP_ENABLE_POOL_METRICS: {}", enable_pool_metrics);
+ println!(" QDP_ENABLE_OVERLAP_TRACKING: {}", enable_overlap_tracking);
+ println!(
+ " RUST_LOG: {}",
+ env::var("RUST_LOG").unwrap_or_else(|_| "not set".to_string())
+ );
+ println!();
+
+ let engine = match QdpEngine::new(0) {
+ Ok(engine) => engine,
+ Err(e) => {
+ eprintln!("CUDA unavailable or initialization failed: {:?}", e);
+ eprintln!();
+ eprintln!("Note: Observability features require CUDA to be
available.");
+ eprintln!(" If CUDA initialization fails, the pipeline will
not run");
+ eprintln!(" and observability metrics will not be
generated.");
+ eprintln!();
+ eprintln!("To verify the code logic without CUDA, run unit
tests:");
+ eprintln!(" cargo test -p qdp-core --lib");
+ return;
+ }
+ };
+
+ // Create test data: 18 qubits = 262144 elements (ensures > 1MB threshold
for pipeline)
+ // Pipeline is only used for data >= 1MB (131072 elements), so we use 18
qubits to ensure
+ // we're well above the threshold and will generate multiple chunks for
better testing
+ const NUM_QUBITS: usize = 18;
+ const VECTOR_LEN: usize = 1 << NUM_QUBITS; // 262144 elements = 2MB
+ const NUM_SAMPLES: usize = 10;
+
+ println!("Test Configuration:");
+ println!(" num qubits: {}", NUM_QUBITS);
+ println!(" vector length: {}", VECTOR_LEN);
+ println!(" num samples: {}", NUM_SAMPLES);
+ println!();
+
+ // Generate test data
+ let mut test_data = vec![0.0f64; NUM_SAMPLES * VECTOR_LEN];
+ for i in 0..NUM_SAMPLES {
+ let offset = i * VECTOR_LEN;
+ for j in 0..VECTOR_LEN {
+ test_data[offset + j] = (j as f64) / (VECTOR_LEN as f64);
+ }
+ }
+
+ println!("Running encoding with observability...");
+ println!();
+
+ // Use encode() method (not encode_batch) to trigger pipeline and
observability
+ // encode_batch uses synchronous path and doesn't use the dual-stream
pipeline
+ // We'll encode each sample individually to trigger the async pipeline
+ for i in 0..NUM_SAMPLES {
+ let sample = &test_data[i * VECTOR_LEN..(i + 1) * VECTOR_LEN];
+ match engine.encode(sample, NUM_QUBITS, "amplitude") {
+ Ok(ptr) => unsafe {
+ let managed = &mut *ptr;
+ if let Some(deleter) = managed.deleter.take() {
+ deleter(ptr);
+ }
+ },
+ Err(e) => {
+ eprintln!("✗ Encoding failed for sample {}: {:?}", i, e);
+ return;
+ }
+ }
+ }
+
+ println!("✓ Encoding completed successfully");
+ println!();
+ println!("Note: If observability is enabled, check the log output above
for:");
+ if enable_pool_metrics {
+ println!(" - Pool Utilization metrics");
+ }
+ if enable_overlap_tracking {
+ println!(" - H2D overlap percentages");
+ }
+
+ println!();
+ println!("=== Test Complete ===");
+}
diff --git a/qdp/qdp-core/src/error.rs b/qdp/qdp-core/src/error.rs
index 387e7fc48..94225357b 100644
--- a/qdp/qdp-core/src/error.rs
+++ b/qdp/qdp-core/src/error.rs
@@ -59,6 +59,7 @@ pub fn cuda_error_to_string(code: i32) -> &'static str {
12 => "cudaErrorInvalidDevicePointer",
17 => "cudaErrorInvalidMemcpyDirection",
30 => "cudaErrorUnknown",
+ 400 => "cudaErrorInvalidResourceHandle",
999 => "CUDA unavailable (non-Linux stub)",
_ => "Unknown CUDA error",
}
diff --git a/qdp/qdp-core/src/gpu/buffer_pool.rs
b/qdp/qdp-core/src/gpu/buffer_pool.rs
index 6604594be..cfe7e0ef9 100644
--- a/qdp/qdp-core/src/gpu/buffer_pool.rs
+++ b/qdp/qdp-core/src/gpu/buffer_pool.rs
@@ -19,9 +19,12 @@
//! page-locked buffers to avoid repeated cudaHostAlloc / cudaFreeHost.
use std::sync::{Arc, Condvar, Mutex, MutexGuard};
+use std::time::Instant;
use crate::error::{MahoutError, Result};
use crate::gpu::memory::PinnedHostBuffer;
+#[cfg(target_os = "linux")]
+use crate::gpu::pool_metrics::PoolMetrics;
/// Handle that automatically returns a buffer to the pool on drop.
#[cfg(target_os = "linux")]
@@ -107,7 +110,27 @@ impl PinnedBufferPool {
/// Acquire a pinned buffer, blocking until one is available.
pub fn acquire(self: &Arc<Self>) -> PinnedBufferHandle {
+ self.acquire_with_metrics(None)
+ }
+
+ /// Acquire a pinned buffer with optional metrics tracking.
+ ///
+ /// # Arguments
+ /// * `metrics` - Optional PoolMetrics instance for tracking utilization
+ ///
+ /// If metrics is provided, records the number of available buffers at
acquire time
+ /// and tracks wait times if the pool is empty.
+ pub fn acquire_with_metrics(
+ self: &Arc<Self>,
+ metrics: Option<&PoolMetrics>,
+ ) -> PinnedBufferHandle {
let mut free = self.lock_free();
+
+ // Record available count while holding the lock to avoid TOCTOU race
condition
+ let available = free.len();
+ if let Some(m) = metrics {
+ m.record_acquire(available);
+ }
loop {
if let Some(buffer) = free.pop() {
return PinnedBufferHandle {
@@ -115,10 +138,22 @@ impl PinnedBufferPool {
pool: Arc::clone(self),
};
}
- free = self
- .available_cv
- .wait(free)
- .unwrap_or_else(|poisoned| poisoned.into_inner());
+
+ // Record wait if metrics enabled
+ if let Some(m) = metrics {
+ let wait_start = Instant::now();
+ free = self
+ .available_cv
+ .wait(free)
+ .unwrap_or_else(|poisoned| poisoned.into_inner());
+ let wait_time = wait_start.elapsed();
+ m.record_wait(wait_time.as_nanos() as u64);
+ } else {
+ free = self
+ .available_cv
+ .wait(free)
+ .unwrap_or_else(|poisoned| poisoned.into_inner());
+ }
}
}
diff --git a/qdp/qdp-core/src/gpu/cuda_ffi.rs b/qdp/qdp-core/src/gpu/cuda_ffi.rs
index fc4582a14..6d8e4cb74 100644
--- a/qdp/qdp-core/src/gpu/cuda_ffi.rs
+++ b/qdp/qdp-core/src/gpu/cuda_ffi.rs
@@ -20,6 +20,14 @@ use std::ffi::c_void;
pub(crate) const CUDA_MEMCPY_HOST_TO_DEVICE: u32 = 1;
pub(crate) const CUDA_EVENT_DISABLE_TIMING: u32 = 0x02;
+pub(crate) const CUDA_EVENT_DEFAULT: u32 = 0x00;
+
+// CUDA error codes
+pub(crate) const CUDA_SUCCESS: i32 = 0;
+// Note: CUDA_ERROR_NOT_READY may be used in future optimizations for
non-blocking event checks
+// Reference:
https://docs.nvidia.com/cuda/cuda-runtime-api/group__CUDART__TYPES.html#group__CUDART__TYPES_1g3f51e3575c2178246db0a94a430e0028
+#[allow(dead_code)]
+pub(crate) const CUDA_ERROR_NOT_READY: i32 = 34;
unsafe extern "C" {
pub(crate) fn cudaHostAlloc(pHost: *mut *mut c_void, size: usize, flags:
u32) -> i32;
@@ -47,4 +55,26 @@ unsafe extern "C" {
count: usize,
stream: *mut c_void,
) -> i32;
+
+ /// Non-blocking event query
+ ///
+ /// Returns CUDA_SUCCESS if the event has completed, CUDA_ERROR_NOT_READY
if not.
+ /// Reference:
https://docs.nvidia.com/cuda/cuda-runtime-api/group__CUDART__EVENT.html
+ ///
+ /// Note: May be used in future optimizations for non-blocking event
checks to reduce
+ /// synchronization overhead in pipeline operations.
+ #[allow(dead_code)]
+ pub(crate) fn cudaEventQuery(event: *mut c_void) -> i32;
+
+ /// Blocking event synchronization
+ ///
+ /// Waits until the completion of all work currently captured in the event.
+ /// Reference:
https://docs.nvidia.com/cuda/cuda-runtime-api/group__CUDART__EVENT.html
+ pub(crate) fn cudaEventSynchronize(event: *mut c_void) -> i32;
+
+ /// Calculate elapsed time between two events (in milliseconds)
+ ///
+ /// Both events must have been created with CUDA_EVENT_DEFAULT flag.
+ /// Reference:
https://docs.nvidia.com/cuda/cuda-runtime-api/group__CUDART__EVENT.html
+ pub(crate) fn cudaEventElapsedTime(ms: *mut f32, start: *mut c_void, end:
*mut c_void) -> i32;
}
diff --git a/qdp/qdp-core/src/gpu/mod.rs b/qdp/qdp-core/src/gpu/mod.rs
index 451da1498..964662af7 100644
--- a/qdp/qdp-core/src/gpu/mod.rs
+++ b/qdp/qdp-core/src/gpu/mod.rs
@@ -18,7 +18,11 @@
pub mod buffer_pool;
pub mod encodings;
pub mod memory;
+#[cfg(target_os = "linux")]
+pub mod overlap_tracker;
pub mod pipeline;
+#[cfg(target_os = "linux")]
+pub mod pool_metrics;
#[cfg(target_os = "linux")]
pub(crate) mod cuda_ffi;
@@ -29,5 +33,9 @@ pub use encodings::{AmplitudeEncoder, AngleEncoder,
BasisEncoder, QuantumEncoder
pub use memory::GpuStateVector;
pub use pipeline::run_dual_stream_pipeline;
+#[cfg(target_os = "linux")]
+pub use overlap_tracker::OverlapTracker;
#[cfg(target_os = "linux")]
pub use pipeline::PipelineContext;
+#[cfg(target_os = "linux")]
+pub use pool_metrics::{PoolMetrics, PoolUtilizationReport};
diff --git a/qdp/qdp-core/src/gpu/overlap_tracker.rs
b/qdp/qdp-core/src/gpu/overlap_tracker.rs
new file mode 100644
index 000000000..c18dcfdfa
--- /dev/null
+++ b/qdp/qdp-core/src/gpu/overlap_tracker.rs
@@ -0,0 +1,465 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//! Overlap tracking for H2D copy and compute operations.
+//!
+//! Uses CUDA events to measure the overlap between host-to-device copy
operations
+//! and compute operations, enabling verification of the >60% overlap target.
+
+use crate::error::{MahoutError, Result};
+use crate::gpu::cuda_ffi::{
+ CUDA_EVENT_DEFAULT, CUDA_SUCCESS, cudaEventCreateWithFlags,
cudaEventDestroy,
+ cudaEventElapsedTime, cudaEventRecord, cudaEventSynchronize,
+};
+use cudarc::driver::safe::CudaStream;
+use std::ffi::c_void;
+
+/// Tracks overlap between H2D copy and compute operations using CUDA events.
+///
+/// Creates events for each pool slot to track copy and compute start/end
times.
+/// Can be optionally enabled via environment variable to minimize overhead
when disabled.
+pub struct OverlapTracker {
+ copy_start_events: Vec<*mut c_void>,
+ copy_end_events: Vec<*mut c_void>,
+ compute_start_events: Vec<*mut c_void>,
+ compute_end_events: Vec<*mut c_void>,
+ pool_size: usize,
+ enabled: bool,
+}
+
+impl OverlapTracker {
+ /// Create a new OverlapTracker for the given pool size.
+ ///
+ /// If disabled, no events are created and all operations are no-ops.
+ pub fn new(pool_size: usize, enabled: bool) -> Result<Self> {
+ if !enabled {
+ return Ok(Self {
+ copy_start_events: Vec::new(),
+ copy_end_events: Vec::new(),
+ compute_start_events: Vec::new(),
+ compute_end_events: Vec::new(),
+ pool_size,
+ enabled: false,
+ });
+ }
+
+ let mut copy_start: Vec<*mut c_void> = Vec::with_capacity(pool_size);
+ let mut copy_end: Vec<*mut c_void> = Vec::with_capacity(pool_size);
+ let mut compute_start: Vec<*mut c_void> =
Vec::with_capacity(pool_size);
+ let mut compute_end: Vec<*mut c_void> = Vec::with_capacity(pool_size);
+
+ unsafe {
+ for _ in 0..pool_size {
+ let mut ev: *mut c_void = std::ptr::null_mut();
+ let ret = cudaEventCreateWithFlags(&mut ev,
CUDA_EVENT_DEFAULT);
+ if ret != CUDA_SUCCESS {
+ Self::cleanup_events(&[©_start, ©_end,
&compute_start, &compute_end]);
+ return Err(MahoutError::Cuda(format!(
+ "Failed to create CUDA event: {}",
+ ret
+ )));
+ }
+ copy_start.push(ev);
+
+ let mut ev: *mut c_void = std::ptr::null_mut();
+ let ret = cudaEventCreateWithFlags(&mut ev,
CUDA_EVENT_DEFAULT);
+ if ret != CUDA_SUCCESS {
+ Self::cleanup_events(&[©_start, ©_end,
&compute_start, &compute_end]);
+ return Err(MahoutError::Cuda(format!(
+ "Failed to create CUDA event: {}",
+ ret
+ )));
+ }
+ copy_end.push(ev);
+
+ let mut ev: *mut c_void = std::ptr::null_mut();
+ let ret = cudaEventCreateWithFlags(&mut ev,
CUDA_EVENT_DEFAULT);
+ if ret != CUDA_SUCCESS {
+ Self::cleanup_events(&[©_start, ©_end,
&compute_start, &compute_end]);
+ return Err(MahoutError::Cuda(format!(
+ "Failed to create CUDA event: {}",
+ ret
+ )));
+ }
+ compute_start.push(ev);
+
+ let mut ev: *mut c_void = std::ptr::null_mut();
+ let ret = cudaEventCreateWithFlags(&mut ev,
CUDA_EVENT_DEFAULT);
+ if ret != CUDA_SUCCESS {
+ Self::cleanup_events(&[©_start, ©_end,
&compute_start, &compute_end]);
+ return Err(MahoutError::Cuda(format!(
+ "Failed to create CUDA event: {}",
+ ret
+ )));
+ }
+ compute_end.push(ev);
+ }
+ }
+
+ Ok(Self {
+ copy_start_events: copy_start,
+ copy_end_events: copy_end,
+ compute_start_events: compute_start,
+ compute_end_events: compute_end,
+ pool_size,
+ enabled,
+ })
+ }
+
+ /// Destroys CUDA events. Caller must ensure all pointers are valid or
null.
+ ///
+ /// # Safety
+ /// This function is `unsafe` because it calls `cudaEventDestroy` on raw
pointers.
+ /// The inner `unsafe` block is required (Rust 2024: `unsafe fn` body is
safe by default).
+ ///
+ /// ## Safety Requirements
+ /// - All pointers in `events` must either be valid CUDA event handles
+ /// created with `cudaEventCreateWithFlags`, or null pointers.
+ /// - Each event must not be destroyed more than once.
+ /// - The events must not be in use by any active CUDA operations.
+ /// - The events must belong to the same CUDA context.
+ unsafe fn cleanup_events(events: &[&Vec<*mut c_void>]) {
+ for event_vec in events {
+ for ev in event_vec.iter() {
+ if !ev.is_null() {
+ unsafe {
+ let _ = cudaEventDestroy(*ev);
+ }
+ }
+ }
+ }
+ }
+
+ fn validate_slot(&self, slot: usize) -> Result<()> {
+ if slot >= self.pool_size {
+ return Err(MahoutError::InvalidInput(format!(
+ "Slot {} out of range (max: {})",
+ slot,
+ self.pool_size.saturating_sub(1)
+ )));
+ }
+ Ok(())
+ }
+
+ fn record_event(&self, event: *mut c_void, stream: &CudaStream) ->
Result<()> {
+ // Validate event is not null before recording
+ // Ref:
https://docs.nvidia.com/cuda/cuda-runtime-api/group__CUDART__EVENT.html
+ // cudaEventRecord returns cudaErrorInvalidResourceHandle if event is
NULL
+ if event.is_null() {
+ return Err(MahoutError::Cuda(
+ "Cannot record event: event is null (invalid resource
handle)".to_string(),
+ ));
+ }
+
+ unsafe {
+ // Ref:
https://docs.nvidia.com/cuda/cuda-runtime-api/group__CUDART__EVENT.html
+ // cudaEventRecord captures the contents of stream at the time of
this call
+ // The operation is asynchronous - use cudaEventQuery or
cudaEventSynchronize
+ // to determine when the event has actually been recorded
+ let ret = cudaEventRecord(event, stream.stream as *mut c_void);
+ if ret != CUDA_SUCCESS {
+ return Err(MahoutError::Cuda(format!(
+ "cudaEventRecord failed: {} ({}). Event may be invalid or
stream may be invalid.",
+ ret,
+ crate::error::cuda_error_to_string(ret)
+ )));
+ }
+ }
+ Ok(())
+ }
+
+ /// Record the start of a copy operation on the copy stream.
+ pub fn record_copy_start(&self, stream: &CudaStream, slot: usize) ->
Result<()> {
+ if !self.enabled {
+ return Ok(());
+ }
+ self.validate_slot(slot)?;
+ self.record_event(self.copy_start_events[slot], stream)
+ }
+
+ /// Record the end of a copy operation on the copy stream.
+ pub fn record_copy_end(&self, stream: &CudaStream, slot: usize) ->
Result<()> {
+ if !self.enabled {
+ return Ok(());
+ }
+ self.validate_slot(slot)?;
+ self.record_event(self.copy_end_events[slot], stream)
+ }
+
+ /// Record the start of a compute operation on the compute stream.
+ pub fn record_compute_start(&self, stream: &CudaStream, slot: usize) ->
Result<()> {
+ if !self.enabled {
+ return Ok(());
+ }
+ self.validate_slot(slot)?;
+ self.record_event(self.compute_start_events[slot], stream)
+ }
+
+ /// Record the end of a compute operation on the compute stream.
+ pub fn record_compute_end(&self, stream: &CudaStream, slot: usize) ->
Result<()> {
+ if !self.enabled {
+ return Ok(());
+ }
+ self.validate_slot(slot)?;
+ self.record_event(self.compute_end_events[slot], stream)
+ }
+
+ /// Calculate the overlap ratio for a specific chunk.
+ ///
+ /// Returns overlap ratio (0.0-1.0): min(copy_time, compute_time) /
max(copy_time, compute_time)
+ ///
+ /// Ref:
https://docs.nvidia.com/cuda/cuda-runtime-api/group__CUDART__EVENT.html
+ /// Events must be recorded before querying. This function waits for
events to complete.
+ ///
+ /// Note: For detailed timing diagnostics, use
`calculate_overlap_with_timing()`.
+ pub fn calculate_overlap(&self, chunk_idx: usize) -> Result<f64> {
+ self.calculate_overlap_with_timing(chunk_idx)
+ .map(|(overlap, _, _, _)| overlap)
+ }
+
+ /// Calculate overlap ratio with detailed timing information.
+ ///
+ /// Returns (overlap_ratio, copy_time_ms, compute_time_ms, overlap_time_ms)
+ /// for detailed diagnostics at DEBUG level.
+ fn calculate_overlap_with_timing(&self, chunk_idx: usize) -> Result<(f64,
f32, f32, f32)> {
+ if !self.enabled {
+ return Ok((0.0, 0.0, 0.0, 0.0));
+ }
+
+ let slot = chunk_idx % self.pool_size;
+
+ // Validate events are not null before querying
+ // Ref:
https://docs.nvidia.com/cuda/cuda-runtime-api/group__CUDART__EVENT.html
+ // cudaEventQuery returns cudaErrorInvalidResourceHandle if event is
NULL
+ if self.copy_end_events[slot].is_null() ||
self.compute_end_events[slot].is_null() {
+ return Err(MahoutError::Cuda(format!(
+ "Event is null for chunk {} slot {}: events may not have been
created",
+ chunk_idx, slot
+ )));
+ }
+
+ unsafe {
+ // Wait for events to complete before calculating elapsed time
+ // Ref:
https://docs.nvidia.com/cuda/cuda-runtime-api/group__CUDART__EVENT.html
+ //
+ // Critical: According to the CUDA Runtime API documentation:
+ // 1. cudaEventRecord() is asynchronous - the event may not be
recorded immediately
+ // 2. Before the first call to cudaEventRecord(), cudaEventQuery()
returns cudaSuccess
+ // (because an empty event is considered "complete")
+ // 3. cudaEventElapsedTime() returns
cudaErrorInvalidResourceHandle (600) if either
+ // event has not been recorded with cudaEventRecord()
+ //
+ // Solution: Use cudaEventSynchronize() instead of
cudaEventQuery() to ensure events
+ // are both recorded AND completed. cudaEventSynchronize() blocks
until the event
+ // has been recorded and all captured work has completed.
+ //
+ // We synchronize end events first (they complete last), then
start events.
+ let ret = cudaEventSynchronize(self.copy_end_events[slot]);
+ if ret != CUDA_SUCCESS {
+ return Err(MahoutError::Cuda(format!(
+ "cudaEventSynchronize (copy end) failed: {} ({}). Event
may not have been recorded.",
+ ret,
+ crate::error::cuda_error_to_string(ret)
+ )));
+ }
+
+ let ret = cudaEventSynchronize(self.compute_end_events[slot]);
+ if ret != CUDA_SUCCESS {
+ return Err(MahoutError::Cuda(format!(
+ "cudaEventSynchronize (compute end) failed: {} ({}). Event
may not have been recorded.",
+ ret,
+ crate::error::cuda_error_to_string(ret)
+ )));
+ }
+
+ // Verify start events are also complete (they should complete
before end events)
+ let ret = cudaEventSynchronize(self.copy_start_events[slot]);
+ if ret != CUDA_SUCCESS {
+ return Err(MahoutError::Cuda(format!(
+ "cudaEventSynchronize (copy start) failed: {} ({}). Event
may not have been recorded.",
+ ret,
+ crate::error::cuda_error_to_string(ret)
+ )));
+ }
+
+ let ret = cudaEventSynchronize(self.compute_start_events[slot]);
+ if ret != CUDA_SUCCESS {
+ return Err(MahoutError::Cuda(format!(
+ "cudaEventSynchronize (compute start) failed: {} ({}).
Event may not have been recorded.",
+ ret,
+ crate::error::cuda_error_to_string(ret)
+ )));
+ }
+ }
+
+ let mut copy_time_ms: f32 = 0.0;
+ let mut compute_time_ms: f32 = 0.0;
+
+ unsafe {
+ // Ref:
https://docs.nvidia.com/cuda/cuda-runtime-api/group__CUDART__EVENT.html
+ // cudaEventElapsedTime returns cudaErrorInvalidResourceHandle
(error 600) if:
+ // 1. Either event has not been recorded with cudaEventRecord()
+ // 2. Either event was created with cudaEventDisableTiming flag
+ // 3. Either event is NULL
+ let ret = cudaEventElapsedTime(
+ &mut copy_time_ms,
+ self.copy_start_events[slot],
+ self.copy_end_events[slot],
+ );
+ if ret != CUDA_SUCCESS {
+ return Err(MahoutError::Cuda(format!(
+ "cudaEventElapsedTime (copy) failed: {} ({}). Events may
not have been recorded properly.",
+ ret,
+ crate::error::cuda_error_to_string(ret)
+ )));
+ }
+
+ let ret = cudaEventElapsedTime(
+ &mut compute_time_ms,
+ self.compute_start_events[slot],
+ self.compute_end_events[slot],
+ );
+ if ret != CUDA_SUCCESS {
+ return Err(MahoutError::Cuda(format!(
+ "cudaEventElapsedTime (compute) failed: {} ({}). Events
may not have been recorded properly.",
+ ret,
+ crate::error::cuda_error_to_string(ret)
+ )));
+ }
+ }
+
+ let overlap_time_ms = copy_time_ms.min(compute_time_ms);
+ let total_time = copy_time_ms.max(compute_time_ms);
+
+ let overlap_ratio = if total_time > 0.0 {
+ (overlap_time_ms / total_time) as f64
+ } else {
+ 0.0
+ };
+
+ Ok((
+ overlap_ratio,
+ copy_time_ms,
+ compute_time_ms,
+ overlap_time_ms,
+ ))
+ }
+
+ /// Log the overlap ratio for a specific chunk.
+ ///
+ /// Logs overlap percentage at INFO level (important performance metric).
+ /// Logs detailed timing information at DEBUG level for troubleshooting.
+ /// Ref: https://docs.rs/env_logger/latest/env_logger/
+ /// - RUST_LOG=debug shows all levels (DEBUG, INFO, WARN, ERROR)
+ /// - RUST_LOG=info shows INFO and above (INFO, WARN, ERROR)
+ ///
+ /// According to Rust logging best practices:
+ /// - INFO: Useful information about normal operation (overlap percentage)
+ /// - DEBUG: Lower priority information for troubleshooting (detailed
timing diagnostics)
+ pub fn log_overlap(&self, chunk_idx: usize) -> Result<()> {
+ if !self.enabled {
+ return Ok(());
+ }
+
+ // Try to calculate overlap with detailed timing information
+ match self.calculate_overlap_with_timing(chunk_idx) {
+ Ok((overlap, copy_time_ms, compute_time_ms, overlap_time_ms)) => {
+ // Log overlap percentage at INFO level (important performance
metric)
+ // Ref: Rust logging best practices - INFO for normal
operation metrics
+ log::info!("Chunk {}: H2D overlap = {:.1}%", chunk_idx,
overlap * 100.0);
+
+ // Log detailed timing information at DEBUG level for
troubleshooting
+ // Ref: Rust logging best practices - DEBUG for detailed
diagnostics
+ if log::log_enabled!(log::Level::Debug) {
+ log::debug!(
+ "Chunk {}: H2D overlap details - copy={:.3}ms,
compute={:.3}ms, overlap={:.3}ms, ratio={:.1}%",
+ chunk_idx,
+ copy_time_ms,
+ compute_time_ms,
+ overlap_time_ms,
+ overlap * 100.0
+ );
+ }
+
+ if overlap < 0.6 {
+ log::warn!(
+ "Chunk {}: Overlap below target (60%), current =
{:.1}%",
+ chunk_idx,
+ overlap * 100.0
+ );
+ }
+
+ Ok(())
+ }
+ Err(e) => {
+ // Log error at INFO level (visible in both debug and info
modes)
+ // Ref:
https://docs.nvidia.com/cuda/cuda-runtime-api/group__CUDART__EVENT.html
+ log::info!(
+ "Chunk {}: H2D overlap calculation unavailable: {}. This
may indicate event lifecycle issues or timing constraints.",
+ chunk_idx,
+ e
+ );
+
+ // Log additional diagnostic details at DEBUG level for
troubleshooting
+ if log::log_enabled!(log::Level::Debug) {
+ let slot = chunk_idx % self.pool_size;
+ log::debug!(
+ "Chunk {} (slot {}): Overlap calculation failed: {}.
Event pointers: copy_end={:?}, compute_end={:?}",
+ chunk_idx,
+ slot,
+ e,
+ self.copy_end_events[slot].is_null(),
+ self.compute_end_events[slot].is_null()
+ );
+ }
+
+ // Return error so caller knows calculation failed
+ Err(e)
+ }
+ }
+ }
+}
+
+impl Drop for OverlapTracker {
+ fn drop(&mut self) {
+ if !self.enabled {
+ return;
+ }
+ unsafe {
+ Self::cleanup_events(&[
+ &self.copy_start_events,
+ &self.copy_end_events,
+ &self.compute_start_events,
+ &self.compute_end_events,
+ ]);
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_overlap_tracker_disabled() {
+ let tracker = OverlapTracker::new(2, false).unwrap();
+ assert!(!tracker.enabled);
+ assert!(tracker.copy_start_events.is_empty());
+ assert!(tracker.copy_end_events.is_empty());
+ assert!(tracker.compute_start_events.is_empty());
+ assert!(tracker.compute_end_events.is_empty());
+ }
+}
diff --git a/qdp/qdp-core/src/gpu/pipeline.rs b/qdp/qdp-core/src/gpu/pipeline.rs
index 75db1f1d2..073ab1c7d 100644
--- a/qdp/qdp-core/src/gpu/pipeline.rs
+++ b/qdp/qdp-core/src/gpu/pipeline.rs
@@ -33,6 +33,10 @@ use crate::gpu::cuda_ffi::{
};
#[cfg(target_os = "linux")]
use crate::gpu::memory::{ensure_device_memory_available, map_allocation_error};
+#[cfg(target_os = "linux")]
+use crate::gpu::overlap_tracker::OverlapTracker;
+#[cfg(target_os = "linux")]
+use crate::gpu::pool_metrics::PoolMetrics;
use cudarc::driver::{CudaDevice, CudaSlice, DevicePtr, safe::CudaStream};
use std::ffi::c_void;
use std::sync::Arc;
@@ -58,7 +62,6 @@ fn validate_event_slot(events: &[*mut c_void], slot: usize)
-> Result<()> {
}
#[cfg(target_os = "linux")]
-#[allow(unsafe_op_in_unsafe_fn)]
impl PipelineContext {
pub fn new(device: &Arc<CudaDevice>, event_slots: usize) -> Result<Self> {
let stream_compute = device
@@ -103,18 +106,20 @@ impl PipelineContext {
len_elements: usize,
) -> Result<()> {
crate::profile_scope!("GPU::H2D_Copy");
- let ret = cudaMemcpyAsync(
- dst,
- src,
- len_elements * std::mem::size_of::<f64>(),
- CUDA_MEMCPY_HOST_TO_DEVICE,
- self.stream_copy.stream as *mut c_void,
- );
- if ret != 0 {
- return Err(MahoutError::Cuda(format!(
- "Async H2D copy failed with CUDA error: {}",
- ret
- )));
+ unsafe {
+ let ret = cudaMemcpyAsync(
+ dst,
+ src,
+ len_elements * std::mem::size_of::<f64>(),
+ CUDA_MEMCPY_HOST_TO_DEVICE,
+ self.stream_copy.stream as *mut c_void,
+ );
+ if ret != 0 {
+ return Err(MahoutError::Cuda(format!(
+ "Async H2D copy failed with CUDA error: {}",
+ ret
+ )));
+ }
}
Ok(())
}
@@ -128,15 +133,17 @@ impl PipelineContext {
crate::profile_scope!("GPU::CopyEventRecord");
validate_event_slot(&self.events_copy_done, slot)?;
- let ret = cudaEventRecord(
- self.events_copy_done[slot],
- self.stream_copy.stream as *mut c_void,
- );
- if ret != 0 {
- return Err(MahoutError::Cuda(format!(
- "cudaEventRecord failed: {}",
- ret
- )));
+ unsafe {
+ let ret = cudaEventRecord(
+ self.events_copy_done[slot],
+ self.stream_copy.stream as *mut c_void,
+ );
+ if ret != 0 {
+ return Err(MahoutError::Cuda(format!(
+ "cudaEventRecord failed: {}",
+ ret
+ )));
+ }
}
Ok(())
}
@@ -150,16 +157,18 @@ impl PipelineContext {
crate::profile_scope!("GPU::StreamWait");
validate_event_slot(&self.events_copy_done, slot)?;
- let ret = cudaStreamWaitEvent(
- self.stream_compute.stream as *mut c_void,
- self.events_copy_done[slot],
- 0,
- );
- if ret != 0 {
- return Err(MahoutError::Cuda(format!(
- "cudaStreamWaitEvent failed: {}",
- ret
- )));
+ unsafe {
+ let ret = cudaStreamWaitEvent(
+ self.stream_compute.stream as *mut c_void,
+ self.events_copy_done[slot],
+ 0,
+ );
+ if ret != 0 {
+ return Err(MahoutError::Cuda(format!(
+ "cudaStreamWaitEvent failed: {}",
+ ret
+ )));
+ }
}
Ok(())
}
@@ -170,12 +179,14 @@ impl PipelineContext {
/// The context and its copy stream must be valid and not destroyed while
syncing.
pub unsafe fn sync_copy_stream(&self) -> Result<()> {
crate::profile_scope!("Pipeline::SyncCopy");
- let ret = cudaStreamSynchronize(self.stream_copy.stream as *mut
c_void);
- if ret != 0 {
- return Err(MahoutError::Cuda(format!(
- "cudaStreamSynchronize(copy) failed: {}",
- ret
- )));
+ unsafe {
+ let ret = cudaStreamSynchronize(self.stream_copy.stream as *mut
c_void);
+ if ret != 0 {
+ return Err(MahoutError::Cuda(format!(
+ "cudaStreamSynchronize(copy) failed: {}",
+ ret
+ )));
+ }
}
Ok(())
}
@@ -220,6 +231,7 @@ impl Drop for PipelineContext {
/// - Data chunking and async H2D copy
/// - Buffer lifetime management
/// - Stream synchronization
+/// - Optional observability (pool metrics and overlap tracking)
///
/// The caller provides a `kernel_launcher` closure that handles the
/// specific kernel launch logic for each chunk.
@@ -229,6 +241,10 @@ impl Drop for PipelineContext {
/// * `host_data` - Full source data to process
/// * `kernel_launcher` - Closure that launches the specific kernel for each
chunk
///
+/// # Environment Variables
+/// * `QDP_ENABLE_POOL_METRICS` - Enable pool utilization metrics (set to "1"
or "true")
+/// * `QDP_ENABLE_OVERLAP_TRACKING` - Enable H2D overlap tracking (set to "1"
or "true")
+///
/// # Example
/// ```rust,ignore
/// run_dual_stream_pipeline(device, host_data, |stream, input_ptr, offset,
len| {
@@ -319,11 +335,34 @@ where
// Pinned host staging pool sized to the current chunking strategy
(double-buffer by default).
const PINNED_POOL_SIZE: usize = 2; // double buffering
+
+ // Check environment variables for observability features
+ let enable_pool_metrics = std::env::var("QDP_ENABLE_POOL_METRICS")
+ .map(|s| s == "1" || s.eq_ignore_ascii_case("true"))
+ .unwrap_or(false);
+ let enable_overlap_tracking = std::env::var("QDP_ENABLE_OVERLAP_TRACKING")
+ .map(|s| s == "1" || s.eq_ignore_ascii_case("true"))
+ .unwrap_or(false);
+
// 1. Create dual streams with per-slot events to coordinate copy ->
compute
let ctx = PipelineContext::new(device, PINNED_POOL_SIZE)?;
let pinned_pool = PinnedBufferPool::new(PINNED_POOL_SIZE,
chunk_size_elements)
.map_err(|e| MahoutError::Cuda(format!("Failed to create pinned buffer
pool: {}", e)))?;
+ // Initialize observability tools (optional)
+ let pool_metrics = if enable_pool_metrics {
+ Some(Arc::new(PoolMetrics::new()))
+ } else {
+ None
+ };
+ let overlap_tracker = if enable_overlap_tracking {
+ Some(OverlapTracker::new(PINNED_POOL_SIZE, true)?)
+ } else {
+ None
+ };
+
+ // 2. Chunk size: 8MB per chunk (balance between overhead and overlap
opportunity)
+ // TODO: tune dynamically based on GPU/PCIe bandwidth.
// 3. Keep temporary buffers alive until all streams complete
// This prevents Rust from dropping them while GPU is still using them
let mut keep_alive_buffers: Vec<CudaSlice<f64>> = Vec::new();
@@ -351,7 +390,11 @@ where
})?;
// Acquire pinned staging buffer and populate it with the current chunk
- let mut pinned_buf = pinned_pool.acquire();
+ let mut pinned_buf = if let Some(ref metrics) = pool_metrics {
+ pinned_pool.acquire_with_metrics(Some(metrics.as_ref()))
+ } else {
+ pinned_pool.acquire()
+ };
{
crate::profile_scope!("GPU::H2D_Stage");
pinned_buf.as_slice_mut()[..chunk.len()].copy_from_slice(chunk);
@@ -361,12 +404,38 @@ where
// Uses CUDA Runtime API (cudaMemcpyAsync) for true async copy
{
crate::profile_scope!("GPU::H2DCopyAsync");
+
+ // Record copy start if overlap tracking enabled
+ // Note: Overlap tracking is optional observability - failures
should not stop the pipeline
+ if let Some(ref tracker) = overlap_tracker
+ && let Err(e) = tracker.record_copy_start(&ctx.stream_copy,
event_slot)
+ {
+ log::warn!(
+ "Chunk {}: Failed to record copy start event: {}. Overlap
tracking may be incomplete.",
+ chunk_idx,
+ e
+ );
+ }
+
unsafe {
ctx.async_copy_to_device(
pinned_buf.ptr() as *const c_void,
*input_chunk_dev.device_ptr() as *mut c_void,
chunk.len(),
)?;
+
+ // Record copy end if overlap tracking enabled
+ // Note: Overlap tracking is optional observability - failures
should not stop the pipeline
+ if let Some(ref tracker) = overlap_tracker
+ && let Err(e) = tracker.record_copy_end(&ctx.stream_copy,
event_slot)
+ {
+ log::warn!(
+ "Chunk {}: Failed to record copy end event: {}.
Overlap tracking may be incomplete.",
+ chunk_idx,
+ e
+ );
+ }
+
ctx.record_copy_done(event_slot)?;
ctx.wait_for_copy(event_slot)?;
}
@@ -388,7 +457,61 @@ where
// Invoke caller's kernel launcher (non-blocking)
{
crate::profile_scope!("GPU::KernelLaunchAsync");
+
+ // Record compute start if overlap tracking enabled
+ // Note: Overlap tracking is optional observability - failures
should not stop the pipeline
+ if let Some(ref tracker) = overlap_tracker
+ && let Err(e) =
tracker.record_compute_start(&ctx.stream_compute, event_slot)
+ {
+ log::warn!(
+ "Chunk {}: Failed to record compute start event: {}.
Overlap tracking may be incomplete.",
+ chunk_idx,
+ e
+ );
+ }
+
kernel_launcher(&ctx.stream_compute, input_ptr, chunk_offset,
chunk.len())?;
+
+ // Record compute end if overlap tracking enabled
+ // Note: Overlap tracking is optional observability - failures
should not stop the pipeline
+ if let Some(ref tracker) = overlap_tracker
+ && let Err(e) =
tracker.record_compute_end(&ctx.stream_compute, event_slot)
+ {
+ log::warn!(
+ "Chunk {}: Failed to record compute end event: {}. Overlap
tracking may be incomplete.",
+ chunk_idx,
+ e
+ );
+ }
+ }
+
+ // Log overlap if tracking enabled
+ // Ref:
https://docs.nvidia.com/cuda/cuda-runtime-api/group__CUDART__EVENT.html
+ // We log after recording events. log_overlap will wait for events to
complete
+ // before calculating elapsed time. This ensures accurate measurements.
+ //
+ // Note: log_overlap now handles both success and failure cases
internally,
+ // logging at appropriate levels (INFO for visibility, DEBUG for
details).
+ #[allow(clippy::manual_is_multiple_of)]
+ if let Some(ref tracker) = overlap_tracker
+ && (chunk_idx % 10 == 0 || chunk_idx == 0)
+ {
+ // Only log every Nth chunk to avoid excessive logging
+ // Note: log_overlap waits for events to complete, which may take
time
+ // If events fail (e.g., invalid resource handle), log_overlap
will log
+ // at INFO level so it's visible in both debug and info modes
+ if let Err(e) = tracker.log_overlap(chunk_idx) {
+ // log_overlap already logged the error at INFO level
+ // We only need to log additional details at DEBUG level if
needed
+ if log::log_enabled!(log::Level::Debug) {
+ log::debug!(
+ "Overlap tracking failed for chunk {}: {}. Pipeline
continues normally.",
+ chunk_idx,
+ e
+ );
+ }
+ // Don't fail the pipeline - overlap tracking is optional
observability
+ }
}
// Keep buffer alive until synchronization
@@ -420,5 +543,11 @@ where
// This is safe because all GPU operations have completed
drop(keep_alive_buffers);
+ // Print pool metrics summary if enabled
+ if let Some(ref metrics) = pool_metrics {
+ let report = metrics.report();
+ report.print_summary();
+ }
+
Ok(())
}
diff --git a/qdp/qdp-core/src/gpu/pool_metrics.rs
b/qdp/qdp-core/src/gpu/pool_metrics.rs
new file mode 100644
index 000000000..60085aa6a
--- /dev/null
+++ b/qdp/qdp-core/src/gpu/pool_metrics.rs
@@ -0,0 +1,242 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//! Pool utilization metrics for diagnosing pool starvation.
+//!
+//! Provides lightweight, thread-safe metrics tracking for pinned buffer pool
+//! utilization. Uses lock-free atomic operations to minimize performance
impact.
+
+use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
+
+/// Pool utilization metrics (thread-safe, lock-free design)
+///
+/// Tracks pool availability, acquire operations, and wait times to diagnose
+/// pool starvation issues. Uses atomic operations with Relaxed ordering for
+/// minimal performance overhead.
+#[derive(Debug)]
+pub struct PoolMetrics {
+ min_available: AtomicUsize,
+ max_available: AtomicUsize,
+ total_acquires: AtomicU64,
+ total_waits: AtomicU64, // Number of times pool was empty when acquiring
+ total_wait_time_ns: AtomicU64, // Total wait time in nanoseconds
+}
+
+impl PoolMetrics {
+ /// Create a new PoolMetrics instance with all counters initialized to
zero.
+ pub fn new() -> Self {
+ Self {
+ min_available: AtomicUsize::new(usize::MAX),
+ max_available: AtomicUsize::new(0),
+ total_acquires: AtomicU64::new(0),
+ total_waits: AtomicU64::new(0),
+ total_wait_time_ns: AtomicU64::new(0),
+ }
+ }
+
+ /// Record an acquire operation with the number of available buffers at
that time.
+ ///
+ /// Uses compare-and-swap loops to ensure atomicity of min/max updates
+ /// and avoid race conditions under concurrent access.
+ pub fn record_acquire(&self, available: usize) {
+ // Update minimum available using a compare-and-swap loop to avoid
races
+ loop {
+ let current_min = self.min_available.load(Ordering::Relaxed);
+ if available >= current_min {
+ break; // Current value is already <= available, no update
needed
+ }
+ match self.min_available.compare_exchange_weak(
+ current_min,
+ available,
+ Ordering::Relaxed,
+ Ordering::Relaxed,
+ ) {
+ Ok(_) => break, // Successfully updated
+ Err(_) => continue, // Value changed, retry
+ }
+ }
+
+ // Update maximum available using a compare-and-swap loop to avoid
races
+ loop {
+ let current_max = self.max_available.load(Ordering::Relaxed);
+ if available <= current_max {
+ break; // Current value is already >= available, no update
needed
+ }
+ match self.max_available.compare_exchange_weak(
+ current_max,
+ available,
+ Ordering::Relaxed,
+ Ordering::Relaxed,
+ ) {
+ Ok(_) => break, // Successfully updated
+ Err(_) => continue, // Value changed, retry
+ }
+ }
+
+ self.total_acquires.fetch_add(1, Ordering::Relaxed);
+ }
+
+ /// Record a wait operation with the wait time in nanoseconds.
+ pub fn record_wait(&self, wait_time_ns: u64) {
+ self.total_waits.fetch_add(1, Ordering::Relaxed);
+ self.total_wait_time_ns
+ .fetch_add(wait_time_ns, Ordering::Relaxed);
+ }
+
+ /// Generate a utilization report from the current metrics.
+ pub fn report(&self) -> PoolUtilizationReport {
+ let acquires = self.total_acquires.load(Ordering::Relaxed);
+ let waits = self.total_waits.load(Ordering::Relaxed);
+ let wait_time_ns = self.total_wait_time_ns.load(Ordering::Relaxed);
+
+ PoolUtilizationReport {
+ min_available: self.min_available.load(Ordering::Relaxed),
+ max_available: self.max_available.load(Ordering::Relaxed),
+ total_acquires: acquires,
+ total_waits: waits,
+ starvation_ratio: if acquires > 0 {
+ waits as f64 / acquires as f64
+ } else {
+ 0.0
+ },
+ avg_wait_time_ns: if waits > 0 { wait_time_ns / waits } else { 0 },
+ }
+ }
+
+ /// Reset all metrics to their initial state.
+ pub fn reset(&self) {
+ self.min_available.store(usize::MAX, Ordering::Relaxed);
+ self.max_available.store(0, Ordering::Relaxed);
+ self.total_acquires.store(0, Ordering::Relaxed);
+ self.total_waits.store(0, Ordering::Relaxed);
+ self.total_wait_time_ns.store(0, Ordering::Relaxed);
+ }
+}
+
+impl Default for PoolMetrics {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+/// Pool utilization report containing aggregated metrics.
+#[derive(Debug, Clone)]
+pub struct PoolUtilizationReport {
+ /// Minimum number of buffers available during any acquire operation
+ pub min_available: usize,
+ /// Maximum number of buffers available during any acquire operation
+ pub max_available: usize,
+ /// Total number of acquire operations
+ pub total_acquires: u64,
+ /// Total number of wait operations (pool was empty)
+ pub total_waits: u64,
+ /// Ratio of waits to acquires (starvation ratio)
+ pub starvation_ratio: f64,
+ /// Average wait time in nanoseconds
+ pub avg_wait_time_ns: u64,
+}
+
+impl PoolUtilizationReport {
+ /// Print a summary of the utilization report to the log.
+ pub fn print_summary(&self) {
+ log::info!(
+ "Pool Utilization: min={}, max={}, acquires={}, waits={},
starvation={:.2}%",
+ self.min_available,
+ self.max_available,
+ self.total_acquires,
+ self.total_waits,
+ self.starvation_ratio * 100.0
+ );
+
+ if self.starvation_ratio > 0.05 {
+ log::warn!(
+ "Pool starvation detected: {:.1}% of acquires had to wait.
Consider increasing pool size.",
+ self.starvation_ratio * 100.0
+ );
+ }
+
+ if self.avg_wait_time_ns > 0 {
+ let avg_wait_ms = self.avg_wait_time_ns as f64 / 1_000_000.0;
+ log::info!("Average wait time: {:.3} ms", avg_wait_ms);
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_pool_metrics_new() {
+ let metrics = PoolMetrics::new();
+ let report = metrics.report();
+ assert_eq!(report.total_acquires, 0);
+ assert_eq!(report.total_waits, 0);
+ assert_eq!(report.min_available, usize::MAX);
+ assert_eq!(report.max_available, 0);
+ }
+
+ #[test]
+ fn test_pool_metrics_record_acquire() {
+ let metrics = PoolMetrics::new();
+ metrics.record_acquire(2);
+ metrics.record_acquire(1);
+ metrics.record_acquire(3);
+
+ let report = metrics.report();
+ assert_eq!(report.total_acquires, 3);
+ assert_eq!(report.min_available, 1);
+ assert_eq!(report.max_available, 3);
+ }
+
+ #[test]
+ fn test_pool_metrics_record_wait() {
+ let metrics = PoolMetrics::new();
+ metrics.record_wait(1_000_000); // 1ms
+ metrics.record_wait(2_000_000); // 2ms
+
+ let report = metrics.report();
+ assert_eq!(report.total_waits, 2);
+ assert_eq!(report.avg_wait_time_ns, 1_500_000);
+ }
+
+ #[test]
+ fn test_pool_metrics_starvation_ratio() {
+ let metrics = PoolMetrics::new();
+ metrics.record_acquire(2);
+ metrics.record_acquire(1);
+ metrics.record_wait(1_000_000);
+
+ let report = metrics.report();
+ assert_eq!(report.total_acquires, 2);
+ assert_eq!(report.total_waits, 1);
+ assert!((report.starvation_ratio - 0.5).abs() < 1e-10);
+ }
+
+ #[test]
+ fn test_pool_metrics_reset() {
+ let metrics = PoolMetrics::new();
+ metrics.record_acquire(2);
+ metrics.record_wait(1_000_000);
+
+ metrics.reset();
+ let report = metrics.report();
+ assert_eq!(report.total_acquires, 0);
+ assert_eq!(report.total_waits, 0);
+ assert_eq!(report.min_available, usize::MAX);
+ assert_eq!(report.max_available, 0);
+ }
+}
diff --git a/qdp/qdp-python/Cargo.toml b/qdp/qdp-python/Cargo.toml
index 6eee0bcf2..44ae30dc7 100644
--- a/qdp/qdp-python/Cargo.toml
+++ b/qdp/qdp-python/Cargo.toml
@@ -11,6 +11,7 @@ crate-type = ["cdylib"]
pyo3 = { version = "0.27", features = ["extension-module"] }
numpy = "0.27"
qdp-core = { path = "../qdp-core" }
+env_logger = "0.11"
[features]
default = []
diff --git a/qdp/qdp-python/src/lib.rs b/qdp/qdp-python/src/lib.rs
index 9bfd1a254..0df6cedc6 100644
--- a/qdp/qdp-python/src/lib.rs
+++ b/qdp/qdp-python/src/lib.rs
@@ -796,6 +796,12 @@ impl QdpEngine {
/// GPU-accelerated quantum data encoding with DLPack integration.
#[pymodule]
fn _qdp(m: &Bound<'_, PyModule>) -> PyResult<()> {
+ // Initialize Rust logging system - respect RUST_LOG environment variable
+ // Ref: https://docs.rs/env_logger/latest/env_logger/
+ // try_init() won't fail if logger is already initialized (e.g., by
another library)
+ // This allows Rust log messages to be visible when RUST_LOG is set
+ let _ = env_logger::Builder::from_default_env().try_init();
+
m.add_class::<QdpEngine>()?;
m.add_class::<QuantumTensor>()?;
Ok(())