featzhang opened a new pull request, #27358:
URL: https://github.com/apache/flink/pull/27358

   ## What is the purpose of the change
   
   This PR adds **comprehensive metrics support** to `AsyncBatchWaitOperator` 
and `OrderedAsyncBatchWaitOperator` for monitoring AI/ML inference workloads.
   
   The existing async batch operators lacked visibility into batch-level 
performance characteristics that are critical for:
   - Tuning batch sizes for optimal throughput
   - Monitoring inference latency
   - Identifying backpressure and concurrency issues
   - Detecting failures in async operations
   
   ## Brief change log
   
   - Added metrics instrumentation to `AsyncBatchWaitOperator`:
     - `batchSize` - Histogram of batch sizes (records per batch)
     - `batchLatencyMs` - Histogram of batch latency (time from first element 
to flush)
     - `asyncCallDurationMs` - Histogram of async call duration (invocation to 
completion)
     - `inflightBatches` - Gauge showing current in-flight async operations
     - `totalBatchesProcessed` - Counter of total batches processed
     - `totalRecordsProcessed` - Counter of total records processed
     - `asyncCallFailures` - Counter of failed async calls
   
   - Added same metrics to `OrderedAsyncBatchWaitOperator` plus:
     - `pendingOrderedBatches` - Gauge showing batches waiting for in-order 
emission
   
   - Added comprehensive unit tests for all metrics functionality
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
   - `AsyncBatchWaitOperatorTest#testBatchSizeMetric` - verifies batch size 
histogram is recorded
   - `AsyncBatchWaitOperatorTest#testBatchAndRecordCounters` - verifies batch 
and record counters
   - `AsyncBatchWaitOperatorTest#testAsyncCallDurationMetric` - verifies 
duration tracking
   - `AsyncBatchWaitOperatorTest#testAsyncCallFailureMetric` - verifies failure 
counter
   - `AsyncBatchWaitOperatorTest#testInflightBatchesTracking` - verifies 
in-flight gauge
   - `AsyncBatchWaitOperatorTest#testBatchLatencyMetric` - verifies latency 
histogram
   - `AsyncBatchWaitOperatorTest#testMetricsWithMultipleBatches` - 
comprehensive multi-batch test
   
   ## Does this pull request potentially affect one of the following parts
   
   - Dependencies (does it add or upgrade a dependency): **no**
   - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: **no**
   - The serializers: **no**
   - The runtime per-record code paths (performance sensitive): **yes** 
(minimal overhead)
   - Anything that affects deployment or recovery: **no**
   - The S3 file system connector: **no**
   
   ## Documentation
   
   - Does this pull request introduce a new feature? **yes**
   - If yes, how is the feature documented? **JavaDocs**
   
   ---
   
   ## PR Series for FLINK-38825
   
   JIRA: [FLINK-38825](https://issues.apache.org/jira/browse/FLINK-38825) - 
Introduce an AI-friendly Async Batch Operator for high-latency inference 
workloads
   
   This feature is implemented incrementally through the following PR series:
   
   | # | PR | Title | Description | Module |
   |---|---|---|---|---|
   | 1 | [#27355](https://github.com/apache/flink/pull/27355) | Introduce 
AsyncBatchFunction and AsyncBatchWaitOperator | Core API and runtime operator 
with unordered semantics and size-based batch triggering | 
`flink-streaming-java` |
   | 2 | [#27356](https://github.com/apache/flink/pull/27356) | Add time-based 
batch triggering | Timeout-based flush to trigger incomplete batches after a 
configurable duration | `flink-streaming-java` |
   | 3 | [#27357](https://github.com/apache/flink/pull/27357) | Add ordered 
async batch support | Ordered output semantics via 
OrderedAsyncBatchWaitOperator with sequence numbers | `flink-streaming-java` |
   | **4** | [#27358](https://github.com/apache/flink/pull/27358) | Add 
inference-oriented metrics | Batch size/latency histograms, async call 
duration, inflight batches, failure counters | **`flink-streaming-java`** |
   | 5 | [#27359](https://github.com/apache/flink/pull/27359) | Implement retry 
and timeout strategies | Fixed-delay/exponential-backoff retry, 
fail-on-timeout/allow-partial timeout policies | `flink-streaming-java` |
   | 6 | [#27360](https://github.com/apache/flink/pull/27360) | Add SQL/Table 
API integration | AsyncBatchLookupFunction, AsyncBatchLookupFunctionProvider, 
lookup join runner | `flink-table` |
   | 7 | [#27361](https://github.com/apache/flink/pull/27361) | Add Python 
DataStream API integration | Python AsyncBatchFunction, async_invoke_batch() 
API, AsyncDataStream batch methods | `flink-python` |
   
   > **Note**: Each PR builds incrementally on the previous ones. PRs should be 
reviewed and merged in order.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to