shibd commented on code in PR #25322:
URL: https://github.com/apache/pulsar/pull/25322#discussion_r2946108178
##########
pip/pip-461.md:
##########
@@ -0,0 +1,231 @@
+# PIP-461: Add queued latency metrics for offloader executors
+
+## Background knowledge
+
+Apache Pulsar tiered storage offloaders move ledger data from BookKeeper into
remote storage and read it back when
+entries are no longer available locally. The jcloud-based implementation,
+`BlobStoreManagedLedgerOffloader`, uses two `OrderedScheduler` instances:
+
+- `scheduler` for offload, streaming offload, and delete tasks
+- `readExecutor` for opening offloaded ledgers and serving subsequent
read/close work
+
+These tasks are not lightweight dispatch operations. They perform blocking
remote-storage work such as multipart upload,
+blob fetches, index reads, and stream reads. When many offload or read tasks
are submitted concurrently, later tasks can
+wait in the executor queue for a meaningful amount of time before they begin
execution.
+
+Pulsar already exposes a set of offloader metrics through
`LedgerOffloaderStats`, such as offload bytes, read bytes,
+read latency, and storage errors. Those metrics help identify remote-storage
throughput and failures, but they do not
+show whether tiered-storage requests are spending time waiting in the
offloader executors before any I/O starts.
+
+## Motivation
+
+Today, an operator can observe that tiered storage reads or writes are slow,
but cannot tell whether the delay is caused
+by:
+
+1. remote storage latency after a task starts running, or
+2. queueing before the task gets a thread on the offloader executor.
+
+This distinction matters in practice:
+
+- long queueing on `scheduler` can delay offload completion and blob deletion
+- long queueing on `readExecutor` can delay opening offloaded ledgers and
serving reads from tiered storage
+- the remediation is different for queue saturation than for remote storage
slowness
+
+Without dedicated queued-latency metrics, operators must infer saturation
indirectly from logs or generic executor
+behavior. That makes diagnosis slower and less reliable, especially when
trying to distinguish write-path pressure from
+read-path pressure in tiered storage.
+
+## Goals
+
+### In Scope
+
+- Add one metric for queued latency on the offload/write executor path
+- Add one metric for queued latency on the offload/read executor path
+- Record the metrics in the jcloud offloader at the point where tasks are
submitted to and begin running on the relevant
+ executors
+- Keep the metrics aligned with the existing `LedgerOffloaderStats`
topic/namespace labeling model
+
+### Out of Scope
+
+- Changing the thread counts or scheduling policy of offloader executors
+- Reworking `OrderedScheduler`
+- Adding queue-depth metrics or executor occupancy metrics
+- Instrumenting every offloader implementation in the same PIP
+- Measuring intentional timer delays introduced by `schedule(...)` calls
+
+## High Level Design
+
+This proposal adds two new summary metrics to `LedgerOffloaderStats`:
+
+- `brk_ledgeroffloader_offload_executor_queue_latency`
+- `brk_ledgeroffloader_read_offload_executor_queue_latency`
+
+The metrics are recorded by capturing the enqueue time when a task is
submitted and observing the elapsed time when the
+task actually begins running on the executor thread.
+
+At a high level:
+
+1. When `BlobStoreManagedLedgerOffloader` submits blocking work to
`scheduler`, Pulsar wraps the task and records the
+ queued time into the offload queue metric when execution starts.
+2. When `BlobStoreManagedLedgerOffloader` submits blocking open/read work to
`readExecutor`, Pulsar wraps the task and
+ records the queued time into the read queue metric when execution starts.
+3. The offloaded read-handle implementations also wrap later `readAsync()` and
`closeAsync()` tasks, so the metric
+ covers the full read lifecycle, not only the initial `readOffloaded()` open
call.
+
+This design intentionally measures only queue wait. Once execution begins, the
existing offloader latency metrics remain
+responsible for capturing remote-storage and ledger-read timing.
+
+## Detailed Design
+
+### Design & Implementation Details
+
+The implementation adds two new methods to `LedgerOffloaderStats`:
+
+- `recordOffloadExecutorQueueLatency`
+- `recordReadOffloadExecutorQueueLatency`
+
+`LedgerOffloaderStatsImpl` registers two new Prometheus `Summary` instances
and records queued time in microseconds,
+matching the unit conversion style used by existing offloader latency metrics.
+
+The jcloud implementation introduces a small helper that wraps submitted tasks:
+
+1. capture `System.nanoTime()` at submission time
+2. when the wrapped task begins execution, compute `System.nanoTime() -
enqueueTime`
+3. record the value into the corresponding offloader queue metric
+4. run the original task body
+
+The helper is used in these paths:
+
+- `BlobStoreManagedLedgerOffloader.offload(...)`
+- `BlobStoreManagedLedgerOffloader.streamingOffload(...)`
+- `BlobStoreManagedLedgerOffloader.readOffloaded(...)`
+- `BlobStoreManagedLedgerOffloader.deleteOffloaded(...)`
+- `BlobStoreBackedReadHandleImpl.readAsync(...)`
+- `BlobStoreBackedReadHandleImpl.closeAsync(...)`
+- `BlobStoreBackedReadHandleImplV2.readAsync(...)`
+- `BlobStoreBackedReadHandleImplV2.closeAsync(...)`
+
+The metric uses the managed-ledger topic name when available. If a
managed-ledger name is absent, the metric falls back
+to the existing `"unknown"` label behavior already used by
`LedgerOffloaderStatsImpl`.
+
+Delayed tasks submitted with `schedule(...)` are intentionally excluded from
this metric because their delay is part of
+the scheduling logic, not executor saturation.
+
+### Public-facing Changes
+
+### Public API
+
+No client-facing public API changes are introduced.
+
+### Binary protocol
+
+No binary protocol changes.
+
+### Configuration
+
+No configuration changes.
+
+### CLI
+
+No CLI changes.
+
+### Metrics
+
+1. `brk_ledgeroffloader_offload_executor_queue_latency`
+ - Description: Time that a blocking tiered-storage offload/write/delete
task spends waiting in the offloader
+ executor queue before starting execution
+ - Attributes:
+ - `namespace`
+ - `topic` when topic-level offloader metrics are enabled
+ - Unit: microseconds
+
+2. `brk_ledgeroffloader_read_offload_executor_queue_latency`
+ - Description: Time that a blocking tiered-storage read task spends waiting
in the offloader read executor queue
+ before starting execution
+ - Attributes:
Review Comment:
Maybe it could be helpful to add one more attribute here, something like
`operation` (or `caller`). These executors seem to handle more than just one
kind of work, so separating values such as `offload`, `delete`, `open`, `read`,
and `close` might make the queue-latency metrics easier to interpret in
practice. For example, it would be easier to tell whether the latency is coming
from read traffic itself or from extra work scheduled on the same executor.
##########
pip/pip-461.md:
##########
@@ -0,0 +1,231 @@
+# PIP-461: Add queued latency metrics for offloader executors
+
+## Background knowledge
+
+Apache Pulsar tiered storage offloaders move ledger data from BookKeeper into
remote storage and read it back when
+entries are no longer available locally. The jcloud-based implementation,
+`BlobStoreManagedLedgerOffloader`, uses two `OrderedScheduler` instances:
+
+- `scheduler` for offload, streaming offload, and delete tasks
+- `readExecutor` for opening offloaded ledgers and serving subsequent
read/close work
+
+These tasks are not lightweight dispatch operations. They perform blocking
remote-storage work such as multipart upload,
+blob fetches, index reads, and stream reads. When many offload or read tasks
are submitted concurrently, later tasks can
+wait in the executor queue for a meaningful amount of time before they begin
execution.
+
+Pulsar already exposes a set of offloader metrics through
`LedgerOffloaderStats`, such as offload bytes, read bytes,
+read latency, and storage errors. Those metrics help identify remote-storage
throughput and failures, but they do not
+show whether tiered-storage requests are spending time waiting in the
offloader executors before any I/O starts.
+
+## Motivation
+
+Today, an operator can observe that tiered storage reads or writes are slow,
but cannot tell whether the delay is caused
+by:
+
+1. remote storage latency after a task starts running, or
+2. queueing before the task gets a thread on the offloader executor.
+
+This distinction matters in practice:
+
+- long queueing on `scheduler` can delay offload completion and blob deletion
+- long queueing on `readExecutor` can delay opening offloaded ledgers and
serving reads from tiered storage
+- the remediation is different for queue saturation than for remote storage
slowness
+
+Without dedicated queued-latency metrics, operators must infer saturation
indirectly from logs or generic executor
+behavior. That makes diagnosis slower and less reliable, especially when
trying to distinguish write-path pressure from
+read-path pressure in tiered storage.
+
+## Goals
+
+### In Scope
+
+- Add one metric for queued latency on the offload/write executor path
+- Add one metric for queued latency on the offload/read executor path
+- Record the metrics in the jcloud offloader at the point where tasks are
submitted to and begin running on the relevant
+ executors
+- Keep the metrics aligned with the existing `LedgerOffloaderStats`
topic/namespace labeling model
+
+### Out of Scope
+
+- Changing the thread counts or scheduling policy of offloader executors
+- Reworking `OrderedScheduler`
+- Adding queue-depth metrics or executor occupancy metrics
+- Instrumenting every offloader implementation in the same PIP
+- Measuring intentional timer delays introduced by `schedule(...)` calls
+
+## High Level Design
+
+This proposal adds two new summary metrics to `LedgerOffloaderStats`:
+
+- `brk_ledgeroffloader_offload_executor_queue_latency`
+- `brk_ledgeroffloader_read_offload_executor_queue_latency`
+
+The metrics are recorded by capturing the enqueue time when a task is
submitted and observing the elapsed time when the
+task actually begins running on the executor thread.
+
+At a high level:
+
+1. When `BlobStoreManagedLedgerOffloader` submits blocking work to
`scheduler`, Pulsar wraps the task and records the
+ queued time into the offload queue metric when execution starts.
+2. When `BlobStoreManagedLedgerOffloader` submits blocking open/read work to
`readExecutor`, Pulsar wraps the task and
+ records the queued time into the read queue metric when execution starts.
+3. The offloaded read-handle implementations also wrap later `readAsync()` and
`closeAsync()` tasks, so the metric
+ covers the full read lifecycle, not only the initial `readOffloaded()` open
call.
+
+This design intentionally measures only queue wait. Once execution begins, the
existing offloader latency metrics remain
+responsible for capturing remote-storage and ledger-read timing.
+
+## Detailed Design
+
+### Design & Implementation Details
+
+The implementation adds two new methods to `LedgerOffloaderStats`:
+
+- `recordOffloadExecutorQueueLatency`
+- `recordReadOffloadExecutorQueueLatency`
+
+`LedgerOffloaderStatsImpl` registers two new Prometheus `Summary` instances
and records queued time in microseconds,
+matching the unit conversion style used by existing offloader latency metrics.
+
+The jcloud implementation introduces a small helper that wraps submitted tasks:
+
+1. capture `System.nanoTime()` at submission time
+2. when the wrapped task begins execution, compute `System.nanoTime() -
enqueueTime`
+3. record the value into the corresponding offloader queue metric
+4. run the original task body
+
+The helper is used in these paths:
+
+- `BlobStoreManagedLedgerOffloader.offload(...)`
+- `BlobStoreManagedLedgerOffloader.streamingOffload(...)`
Review Comment:
Maybe we can leave `streamingOffload()` out of scope for now, since this
path does not seem to be wired into the current production offload flow yet. If
you would still like to include it here, could we also note that there is an
intentional delayed resubmission in
`BlobStoreManagedLedgerOffloader.streamingOffloadLoop()` and subtract that
delay from the queue-latency metric?
```java
scheduler.chooseThread(segmentInfo)
.schedule(() -> {
streamingOffloadLoop(partId, dataObjectLength);
}, 100, TimeUnit.MILLISECONDS);
```
Otherwise the metric may end up including this built-in 100ms delay, instead
of only the actual executor queue time.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]