[DISCUSS] FLINK-38825: Introduce AsyncBatchFunction for High-Throughput
Inference Workloads

Hi Flink Community,

I'd like to start a discussion on *FLINK-38825*, which introduces
batch-oriented asynchronous processing capabilities to Apache Flink. This
feature is specifically designed for high-latency, high-throughput
workloads such as AI/ML model inference.
🎯 Motivation

Current Flink's AsyncFunction processes one element at a time, which is
suboptimal for many modern use cases:

   1. *Low GPU Utilization*: ML inference frameworks (TensorFlow, PyTorch,
   ONNX) achieve optimal throughput with batched inputs, but single-element
   async calls underutilize GPUs (typically ~20% utilization)
   2. *High Overhead*: External service calls that support batch APIs incur
   unnecessary network overhead when invoked per-element
   3. *Inefficient Resource Usage*: Many systems (databases, caches,
   inference services) can process batches 3-12x more efficiently than
   individual requests

💡 Proposed Solution

We propose a new AsyncBatchFunction interface that allows users to process
multiple stream elements in a single async operation:

@PublicEvolving
public interface AsyncBatchFunction<IN, OUT> extends Function, Serializable {
    void asyncInvokeBatch(List<IN> inputs, ResultFuture<OUT>
resultFuture) throws Exception;
}

*Key Features:*

   - ✅ *Dual Triggering*: Size-based (maxBatchSize) and time-based
   (batchTimeout) batch triggering
   - ✅ *Ordering Modes*: Both ordered and unordered processing supported
   - ✅ *Retry & Timeout*: Configurable strategies (fixed delay, exponential
   backoff)
   - ✅ *Rich Metrics*: 8 inference-specific metrics (batch size,
   throughput, latency, etc.)
   - ✅ *Full Stack*: Complete integration with DataStream API, Table
   API/SQL, and PyFlink

📊 Performance Improvements

Based on our benchmarks with ML inference workloads:
Batch Size Throughput Improvement P99 Latency GPU Utilization
1 (baseline) 1x 50ms ~20%
8 3-4x 80ms ~60%
16 5-7x 120ms ~80%
32 8-12x 200ms ~95%🔧 API ExamplesJava DataStream API:

DataStream<Prediction> predictions = AsyncDataStream.unorderedWaitBatch(
    images,
    new ModelInferenceFunction(),
    32,   // maxBatchSize
    100,  // batchTimeout (ms)
    TimeUnit.MILLISECONDS
);

Python API (PyFlink):

predictions = AsyncDataStream.unordered_wait_batch(
    images,
    BatchModelInference(),
    max_batch_size=32,
    batch_timeout=100
)

Table API/SQL:

SELECT a.*, b.features
FROM input_table a
JOIN model_table FOR SYSTEM_TIME AS OF a.proc_time AS b
ON a.id = b.id
-- Automatically uses AsyncBatchLookupFunction

📦 Implementation Status

The feature has been fully implemented across *7 commits* on the
FLINK-38825-python branch:

   1. ✅ [5caaa843] Core AsyncBatchFunction and AsyncBatchWaitOperator (+731
   lines)
   2. ✅ [35e5398c] Time-based batch triggering (+405 lines)
   3. ✅ [7ffc0065] Ordered processing support (+1,046 lines)
   4. ✅ [0de5e881] Inference-specific metrics (+763 lines)
   5. ✅ [ebf8fee7] Retry and timeout strategies (+1,778 lines)
   6. ✅ [9baeeb0e] Table API/SQL integration (+1,139 lines)
   7. ✅ [83e10731] Python API support (+1,187 lines)

*Total Implementation:* 26 files, ~6,500 lines of code, ~1,886 lines of
tests
🔍 Key Design Points for Discussion

I'd appreciate the community's feedback on the following design aspects:
1. API Design

*Question:* Is the AsyncBatchFunction interface intuitive enough? Should we
provide more convenience methods?

The current design requires users to handle the entire batch in one call.
We considered splitting batch results, but opted for simplicity in v1.
2. Triggering Strategy

*Question:* Should we support adaptive batching based on load/latency?

Current design: Trigger when maxBatchSize is reached OR batchTimeout expires
(whichever comes first). This is simple but static.
3. Timeout Semantics

*Question:* Which timeout mode should be the default?

We offer two modes:

   - BATCH: Timeout applies to entire batch operation (default)
   - ELEMENT: Timeout per element (batch timeout = element_timeout ×
   batch_size)

4. Ordering Guarantees

*Question:* Should we add a "best-effort ordering" mode for better
performance?

The ordered variant (orderedWaitBatch) maintains strict order but buffers
results. A relaxed mode could improve throughput.
5. Backpressure Handling

*Question:* How should batch accumulation interact with backpressure?

Current behavior: Continue accumulating until size/timeout triggers. Should
we flush partial batches under high backpressure?
6. Checkpoint Semantics

*Question:* Are the checkpoint semantics correct?

In-flight batches are currently buffered in operator state during
checkpointing. This ensures exactly-once but increases checkpoint size.
📚 Documentation

Full FLIP documents are available on the branch:

   - *JIRA:* FLINK-38825 <https://issues.apache.org/jira/browse/FLINK-38825>
   - *Branch:* FLINK-38825-python (based on master)

🎯 Use Cases Beyond ML Inference

While designed for ML inference, this feature benefits:

   - *Batch Translation APIs*: Google Translate, DeepL batch endpoints
   - *Geocoding Services*: Batch address → coordinates conversion
   - *Database Bulk Operations*: Batch inserts/updates to ClickHouse,
   Cassandra
   - *Feature Stores*: Batch feature retrieval (Feast, Tecton)
   - *Cache Warming*: Batch Redis/Memcached operations

🤝 Request for Feedback

I'd love to hear your thoughts on:

   1. *API Design*: Is it intuitive? Any missing convenience methods?
   2. *Default Configuration*: What should be the default batch
   size/timeout?
   3. *Performance Trade-offs*: Acceptable latency increase for throughput
   gains?
   4. *Integration Points*: Any concerns with Table API or Python API
   design?
   5. *Testing Strategy*: Are there specific scenarios we should benchmark?
   6. *Documentation*: What additional examples would be helpful?

⏰ Timeline

If the community is supportive, I'd like to:

   - *Week 1-2*: Address feedback and refine design
   - *Week 3*: Create official FLIP on Confluence
   - *Week 4+*: Code review and merge preparation

Looking forward to your feedback and suggestions!

*Best regards,*
Feat Zhang

Reply via email to