Hello,

In a similar vein but tangentially, I think it would be interesting to add
support for the AsyncScalarFunction in Batch. If it is possible to reuse
the current interface or add a mixin for batch that would be great so you
can reuse the same function in batch/streaming (with maybe an additional
method). Then we can add the ExecNode in the calcite parser for
AsyncScalarFunction for batch.

Similar idea but a different approach.

Ryan van Huuksloot
Staff Engineer, Infrastructure | Streaming Platform
[image: Shopify]
<https://www.shopify.com/?utm_medium=salessignatures&utm_source=hs_email>


On Wed, Feb 4, 2026 at 9:17 AM FeatZhang <[email protected]> wrote:

> [DISCUSS] FLINK-38825: Introduce AsyncBatchFunction for High-Throughput
> Inference Workloads
>
> Hi Flink Community,
>
> I'd like to start a discussion on *FLINK-38825*, which introduces
> batch-oriented asynchronous processing capabilities to Apache Flink. This
> feature is specifically designed for high-latency, high-throughput
> workloads such as AI/ML model inference.
> 🎯 Motivation
>
> Current Flink's AsyncFunction processes one element at a time, which is
> suboptimal for many modern use cases:
>
>    1. *Low GPU Utilization*: ML inference frameworks (TensorFlow, PyTorch,
>    ONNX) achieve optimal throughput with batched inputs, but single-element
>    async calls underutilize GPUs (typically ~20% utilization)
>    2. *High Overhead*: External service calls that support batch APIs incur
>    unnecessary network overhead when invoked per-element
>    3. *Inefficient Resource Usage*: Many systems (databases, caches,
>    inference services) can process batches 3-12x more efficiently than
>    individual requests
>
> 💡 Proposed Solution
>
> We propose a new AsyncBatchFunction interface that allows users to process
> multiple stream elements in a single async operation:
>
> @PublicEvolving
> public interface AsyncBatchFunction<IN, OUT> extends Function,
> Serializable {
>     void asyncInvokeBatch(List<IN> inputs, ResultFuture<OUT>
> resultFuture) throws Exception;
> }
>
> *Key Features:*
>
>    - ✅ *Dual Triggering*: Size-based (maxBatchSize) and time-based
>    (batchTimeout) batch triggering
>    - ✅ *Ordering Modes*: Both ordered and unordered processing supported
>    - ✅ *Retry & Timeout*: Configurable strategies (fixed delay, exponential
>    backoff)
>    - ✅ *Rich Metrics*: 8 inference-specific metrics (batch size,
>    throughput, latency, etc.)
>    - ✅ *Full Stack*: Complete integration with DataStream API, Table
>    API/SQL, and PyFlink
>
> 📊 Performance Improvements
>
> Based on our benchmarks with ML inference workloads:
> Batch Size Throughput Improvement P99 Latency GPU Utilization
> 1 (baseline) 1x 50ms ~20%
> 8 3-4x 80ms ~60%
> 16 5-7x 120ms ~80%
> 32 8-12x 200ms ~95%🔧 API ExamplesJava DataStream API:
>
> DataStream<Prediction> predictions = AsyncDataStream.unorderedWaitBatch(
>     images,
>     new ModelInferenceFunction(),
>     32,   // maxBatchSize
>     100,  // batchTimeout (ms)
>     TimeUnit.MILLISECONDS
> );
>
> Python API (PyFlink):
>
> predictions = AsyncDataStream.unordered_wait_batch(
>     images,
>     BatchModelInference(),
>     max_batch_size=32,
>     batch_timeout=100
> )
>
> Table API/SQL:
>
> SELECT a.*, b.features
> FROM input_table a
> JOIN model_table FOR SYSTEM_TIME AS OF a.proc_time AS b
> ON a.id = b.id
> -- Automatically uses AsyncBatchLookupFunction
>
> 📦 Implementation Status
>
> The feature has been fully implemented across *7 commits* on the
> FLINK-38825-python branch:
>
>    1. ✅ [5caaa843] Core AsyncBatchFunction and AsyncBatchWaitOperator (+731
>    lines)
>    2. ✅ [35e5398c] Time-based batch triggering (+405 lines)
>    3. ✅ [7ffc0065] Ordered processing support (+1,046 lines)
>    4. ✅ [0de5e881] Inference-specific metrics (+763 lines)
>    5. ✅ [ebf8fee7] Retry and timeout strategies (+1,778 lines)
>    6. ✅ [9baeeb0e] Table API/SQL integration (+1,139 lines)
>    7. ✅ [83e10731] Python API support (+1,187 lines)
>
> *Total Implementation:* 26 files, ~6,500 lines of code, ~1,886 lines of
> tests
> 🔍 Key Design Points for Discussion
>
> I'd appreciate the community's feedback on the following design aspects:
> 1. API Design
>
> *Question:* Is the AsyncBatchFunction interface intuitive enough? Should we
> provide more convenience methods?
>
> The current design requires users to handle the entire batch in one call.
> We considered splitting batch results, but opted for simplicity in v1.
> 2. Triggering Strategy
>
> *Question:* Should we support adaptive batching based on load/latency?
>
> Current design: Trigger when maxBatchSize is reached OR batchTimeout
> expires
> (whichever comes first). This is simple but static.
> 3. Timeout Semantics
>
> *Question:* Which timeout mode should be the default?
>
> We offer two modes:
>
>    - BATCH: Timeout applies to entire batch operation (default)
>    - ELEMENT: Timeout per element (batch timeout = element_timeout ×
>    batch_size)
>
> 4. Ordering Guarantees
>
> *Question:* Should we add a "best-effort ordering" mode for better
> performance?
>
> The ordered variant (orderedWaitBatch) maintains strict order but buffers
> results. A relaxed mode could improve throughput.
> 5. Backpressure Handling
>
> *Question:* How should batch accumulation interact with backpressure?
>
> Current behavior: Continue accumulating until size/timeout triggers. Should
> we flush partial batches under high backpressure?
> 6. Checkpoint Semantics
>
> *Question:* Are the checkpoint semantics correct?
>
> In-flight batches are currently buffered in operator state during
> checkpointing. This ensures exactly-once but increases checkpoint size.
> 📚 Documentation
>
> Full FLIP documents are available on the branch:
>
>    - *JIRA:* FLINK-38825 <
> https://issues.apache.org/jira/browse/FLINK-38825>
>    - *Branch:* FLINK-38825-python (based on master)
>
> 🎯 Use Cases Beyond ML Inference
>
> While designed for ML inference, this feature benefits:
>
>    - *Batch Translation APIs*: Google Translate, DeepL batch endpoints
>    - *Geocoding Services*: Batch address → coordinates conversion
>    - *Database Bulk Operations*: Batch inserts/updates to ClickHouse,
>    Cassandra
>    - *Feature Stores*: Batch feature retrieval (Feast, Tecton)
>    - *Cache Warming*: Batch Redis/Memcached operations
>
> 🤝 Request for Feedback
>
> I'd love to hear your thoughts on:
>
>    1. *API Design*: Is it intuitive? Any missing convenience methods?
>    2. *Default Configuration*: What should be the default batch
>    size/timeout?
>    3. *Performance Trade-offs*: Acceptable latency increase for throughput
>    gains?
>    4. *Integration Points*: Any concerns with Table API or Python API
>    design?
>    5. *Testing Strategy*: Are there specific scenarios we should benchmark?
>    6. *Documentation*: What additional examples would be helpful?
>
> ⏰ Timeline
>
> If the community is supportive, I'd like to:
>
>    - *Week 1-2*: Address feedback and refine design
>    - *Week 3*: Create official FLIP on Confluence
>    - *Week 4+*: Code review and merge preparation
>
> Looking forward to your feedback and suggestions!
>
> *Best regards,*
> Feat Zhang
>

Reply via email to