Hello, In a similar vein but tangentially, I think it would be interesting to add support for the AsyncScalarFunction in Batch. If it is possible to reuse the current interface or add a mixin for batch that would be great so you can reuse the same function in batch/streaming (with maybe an additional method). Then we can add the ExecNode in the calcite parser for AsyncScalarFunction for batch.
Similar idea but a different approach. Ryan van Huuksloot Staff Engineer, Infrastructure | Streaming Platform [image: Shopify] <https://www.shopify.com/?utm_medium=salessignatures&utm_source=hs_email> On Wed, Feb 4, 2026 at 9:17 AM FeatZhang <[email protected]> wrote: > [DISCUSS] FLINK-38825: Introduce AsyncBatchFunction for High-Throughput > Inference Workloads > > Hi Flink Community, > > I'd like to start a discussion on *FLINK-38825*, which introduces > batch-oriented asynchronous processing capabilities to Apache Flink. This > feature is specifically designed for high-latency, high-throughput > workloads such as AI/ML model inference. > 🎯 Motivation > > Current Flink's AsyncFunction processes one element at a time, which is > suboptimal for many modern use cases: > > 1. *Low GPU Utilization*: ML inference frameworks (TensorFlow, PyTorch, > ONNX) achieve optimal throughput with batched inputs, but single-element > async calls underutilize GPUs (typically ~20% utilization) > 2. *High Overhead*: External service calls that support batch APIs incur > unnecessary network overhead when invoked per-element > 3. *Inefficient Resource Usage*: Many systems (databases, caches, > inference services) can process batches 3-12x more efficiently than > individual requests > > 💡 Proposed Solution > > We propose a new AsyncBatchFunction interface that allows users to process > multiple stream elements in a single async operation: > > @PublicEvolving > public interface AsyncBatchFunction<IN, OUT> extends Function, > Serializable { > void asyncInvokeBatch(List<IN> inputs, ResultFuture<OUT> > resultFuture) throws Exception; > } > > *Key Features:* > > - ✅ *Dual Triggering*: Size-based (maxBatchSize) and time-based > (batchTimeout) batch triggering > - ✅ *Ordering Modes*: Both ordered and unordered processing supported > - ✅ *Retry & Timeout*: Configurable strategies (fixed delay, exponential > backoff) > - ✅ *Rich Metrics*: 8 inference-specific metrics (batch size, > throughput, latency, etc.) > - ✅ *Full Stack*: Complete integration with DataStream API, Table > API/SQL, and PyFlink > > 📊 Performance Improvements > > Based on our benchmarks with ML inference workloads: > Batch Size Throughput Improvement P99 Latency GPU Utilization > 1 (baseline) 1x 50ms ~20% > 8 3-4x 80ms ~60% > 16 5-7x 120ms ~80% > 32 8-12x 200ms ~95%🔧 API ExamplesJava DataStream API: > > DataStream<Prediction> predictions = AsyncDataStream.unorderedWaitBatch( > images, > new ModelInferenceFunction(), > 32, // maxBatchSize > 100, // batchTimeout (ms) > TimeUnit.MILLISECONDS > ); > > Python API (PyFlink): > > predictions = AsyncDataStream.unordered_wait_batch( > images, > BatchModelInference(), > max_batch_size=32, > batch_timeout=100 > ) > > Table API/SQL: > > SELECT a.*, b.features > FROM input_table a > JOIN model_table FOR SYSTEM_TIME AS OF a.proc_time AS b > ON a.id = b.id > -- Automatically uses AsyncBatchLookupFunction > > 📦 Implementation Status > > The feature has been fully implemented across *7 commits* on the > FLINK-38825-python branch: > > 1. ✅ [5caaa843] Core AsyncBatchFunction and AsyncBatchWaitOperator (+731 > lines) > 2. ✅ [35e5398c] Time-based batch triggering (+405 lines) > 3. ✅ [7ffc0065] Ordered processing support (+1,046 lines) > 4. ✅ [0de5e881] Inference-specific metrics (+763 lines) > 5. ✅ [ebf8fee7] Retry and timeout strategies (+1,778 lines) > 6. ✅ [9baeeb0e] Table API/SQL integration (+1,139 lines) > 7. ✅ [83e10731] Python API support (+1,187 lines) > > *Total Implementation:* 26 files, ~6,500 lines of code, ~1,886 lines of > tests > 🔍 Key Design Points for Discussion > > I'd appreciate the community's feedback on the following design aspects: > 1. API Design > > *Question:* Is the AsyncBatchFunction interface intuitive enough? Should we > provide more convenience methods? > > The current design requires users to handle the entire batch in one call. > We considered splitting batch results, but opted for simplicity in v1. > 2. Triggering Strategy > > *Question:* Should we support adaptive batching based on load/latency? > > Current design: Trigger when maxBatchSize is reached OR batchTimeout > expires > (whichever comes first). This is simple but static. > 3. Timeout Semantics > > *Question:* Which timeout mode should be the default? > > We offer two modes: > > - BATCH: Timeout applies to entire batch operation (default) > - ELEMENT: Timeout per element (batch timeout = element_timeout × > batch_size) > > 4. Ordering Guarantees > > *Question:* Should we add a "best-effort ordering" mode for better > performance? > > The ordered variant (orderedWaitBatch) maintains strict order but buffers > results. A relaxed mode could improve throughput. > 5. Backpressure Handling > > *Question:* How should batch accumulation interact with backpressure? > > Current behavior: Continue accumulating until size/timeout triggers. Should > we flush partial batches under high backpressure? > 6. Checkpoint Semantics > > *Question:* Are the checkpoint semantics correct? > > In-flight batches are currently buffered in operator state during > checkpointing. This ensures exactly-once but increases checkpoint size. > 📚 Documentation > > Full FLIP documents are available on the branch: > > - *JIRA:* FLINK-38825 < > https://issues.apache.org/jira/browse/FLINK-38825> > - *Branch:* FLINK-38825-python (based on master) > > 🎯 Use Cases Beyond ML Inference > > While designed for ML inference, this feature benefits: > > - *Batch Translation APIs*: Google Translate, DeepL batch endpoints > - *Geocoding Services*: Batch address → coordinates conversion > - *Database Bulk Operations*: Batch inserts/updates to ClickHouse, > Cassandra > - *Feature Stores*: Batch feature retrieval (Feast, Tecton) > - *Cache Warming*: Batch Redis/Memcached operations > > 🤝 Request for Feedback > > I'd love to hear your thoughts on: > > 1. *API Design*: Is it intuitive? Any missing convenience methods? > 2. *Default Configuration*: What should be the default batch > size/timeout? > 3. *Performance Trade-offs*: Acceptable latency increase for throughput > gains? > 4. *Integration Points*: Any concerns with Table API or Python API > design? > 5. *Testing Strategy*: Are there specific scenarios we should benchmark? > 6. *Documentation*: What additional examples would be helpful? > > ⏰ Timeline > > If the community is supportive, I'd like to: > > - *Week 1-2*: Address feedback and refine design > - *Week 3*: Create official FLIP on Confluence > - *Week 4+*: Code review and merge preparation > > Looking forward to your feedback and suggestions! > > *Best regards,* > Feat Zhang >
