[DISCUSS] FLINK-38825: Introduce AsyncBatchFunction for High-Throughput
Inference Workloads
Hi Flink Community,
I'd like to start a discussion on *FLINK-38825*, which introduces
batch-oriented asynchronous processing capabilities to Apache Flink. This
feature is specifically designed for high-latency, high-throughput
workloads such as AI/ML model inference.
🎯 Motivation
Current Flink's AsyncFunction processes one element at a time, which is
suboptimal for many modern use cases:
1. *Low GPU Utilization*: ML inference frameworks (TensorFlow, PyTorch,
ONNX) achieve optimal throughput with batched inputs, but single-element
async calls underutilize GPUs (typically ~20% utilization)
2. *High Overhead*: External service calls that support batch APIs incur
unnecessary network overhead when invoked per-element
3. *Inefficient Resource Usage*: Many systems (databases, caches,
inference services) can process batches 3-12x more efficiently than
individual requests
💡 Proposed Solution
We propose a new AsyncBatchFunction interface that allows users to process
multiple stream elements in a single async operation:
@PublicEvolving
public interface AsyncBatchFunction<IN, OUT> extends Function, Serializable {
void asyncInvokeBatch(List<IN> inputs, ResultFuture<OUT>
resultFuture) throws Exception;
}
*Key Features:*
- ✅ *Dual Triggering*: Size-based (maxBatchSize) and time-based
(batchTimeout) batch triggering
- ✅ *Ordering Modes*: Both ordered and unordered processing supported
- ✅ *Retry & Timeout*: Configurable strategies (fixed delay, exponential
backoff)
- ✅ *Rich Metrics*: 8 inference-specific metrics (batch size,
throughput, latency, etc.)
- ✅ *Full Stack*: Complete integration with DataStream API, Table
API/SQL, and PyFlink
📊 Performance Improvements
Based on our benchmarks with ML inference workloads:
Batch Size Throughput Improvement P99 Latency GPU Utilization
1 (baseline) 1x 50ms ~20%
8 3-4x 80ms ~60%
16 5-7x 120ms ~80%
32 8-12x 200ms ~95%🔧 API ExamplesJava DataStream API:
DataStream<Prediction> predictions = AsyncDataStream.unorderedWaitBatch(
images,
new ModelInferenceFunction(),
32, // maxBatchSize
100, // batchTimeout (ms)
TimeUnit.MILLISECONDS
);
Python API (PyFlink):
predictions = AsyncDataStream.unordered_wait_batch(
images,
BatchModelInference(),
max_batch_size=32,
batch_timeout=100
)
Table API/SQL:
SELECT a.*, b.features
FROM input_table a
JOIN model_table FOR SYSTEM_TIME AS OF a.proc_time AS b
ON a.id = b.id
-- Automatically uses AsyncBatchLookupFunction
📦 Implementation Status
The feature has been fully implemented across *7 commits* on the
FLINK-38825-python branch:
1. ✅ [5caaa843] Core AsyncBatchFunction and AsyncBatchWaitOperator (+731
lines)
2. ✅ [35e5398c] Time-based batch triggering (+405 lines)
3. ✅ [7ffc0065] Ordered processing support (+1,046 lines)
4. ✅ [0de5e881] Inference-specific metrics (+763 lines)
5. ✅ [ebf8fee7] Retry and timeout strategies (+1,778 lines)
6. ✅ [9baeeb0e] Table API/SQL integration (+1,139 lines)
7. ✅ [83e10731] Python API support (+1,187 lines)
*Total Implementation:* 26 files, ~6,500 lines of code, ~1,886 lines of
tests
🔍 Key Design Points for Discussion
I'd appreciate the community's feedback on the following design aspects:
1. API Design
*Question:* Is the AsyncBatchFunction interface intuitive enough? Should we
provide more convenience methods?
The current design requires users to handle the entire batch in one call.
We considered splitting batch results, but opted for simplicity in v1.
2. Triggering Strategy
*Question:* Should we support adaptive batching based on load/latency?
Current design: Trigger when maxBatchSize is reached OR batchTimeout expires
(whichever comes first). This is simple but static.
3. Timeout Semantics
*Question:* Which timeout mode should be the default?
We offer two modes:
- BATCH: Timeout applies to entire batch operation (default)
- ELEMENT: Timeout per element (batch timeout = element_timeout ×
batch_size)
4. Ordering Guarantees
*Question:* Should we add a "best-effort ordering" mode for better
performance?
The ordered variant (orderedWaitBatch) maintains strict order but buffers
results. A relaxed mode could improve throughput.
5. Backpressure Handling
*Question:* How should batch accumulation interact with backpressure?
Current behavior: Continue accumulating until size/timeout triggers. Should
we flush partial batches under high backpressure?
6. Checkpoint Semantics
*Question:* Are the checkpoint semantics correct?
In-flight batches are currently buffered in operator state during
checkpointing. This ensures exactly-once but increases checkpoint size.
📚 Documentation
Full FLIP documents are available on the branch:
- *JIRA:* FLINK-38825 <https://issues.apache.org/jira/browse/FLINK-38825>
- *Branch:* FLINK-38825-python (based on master)
🎯 Use Cases Beyond ML Inference
While designed for ML inference, this feature benefits:
- *Batch Translation APIs*: Google Translate, DeepL batch endpoints
- *Geocoding Services*: Batch address → coordinates conversion
- *Database Bulk Operations*: Batch inserts/updates to ClickHouse,
Cassandra
- *Feature Stores*: Batch feature retrieval (Feast, Tecton)
- *Cache Warming*: Batch Redis/Memcached operations
🤝 Request for Feedback
I'd love to hear your thoughts on:
1. *API Design*: Is it intuitive? Any missing convenience methods?
2. *Default Configuration*: What should be the default batch
size/timeout?
3. *Performance Trade-offs*: Acceptable latency increase for throughput
gains?
4. *Integration Points*: Any concerns with Table API or Python API
design?
5. *Testing Strategy*: Are there specific scenarios we should benchmark?
6. *Documentation*: What additional examples would be helpful?
⏰ Timeline
If the community is supportive, I'd like to:
- *Week 1-2*: Address feedback and refine design
- *Week 3*: Create official FLIP on Confluence
- *Week 4+*: Code review and merge preparation
Looking forward to your feedback and suggestions!
*Best regards,*
Feat Zhang