featzhang opened a new pull request, #27361:
URL: https://github.com/apache/flink/pull/27361
## What is the purpose of the change
This PR adds **Python DataStream API integration** for the existing Java
`AsyncBatchWaitOperator` runtime capability, enabling Python-based AI/ML
inference and external service calls to use **batch-oriented async execution**.
This is a **pure integration PR** - all batching, scheduling, and async
execution logic is reused from the Java side.
## Brief change log
### New Python Classes
| File | Description |
|------|-------------|
| `AsyncBatchFunction` | Python async batch function interface |
| `AsyncBatchFunctionDescriptor` | Descriptor for serialization and
configuration |
| `AsyncBatchOperation` | Runtime operation for batch async execution |
| `BatchResultDistributor` | Distributes batch results to individual
elements |
### Modified Files
| File | Changes |
|------|---------|
| `async_data_stream.py` | Added `unordered_wait_batch()` and
`ordered_wait_batch()` methods |
| `functions.py` | Added `AsyncBatchFunction` and
`AsyncBatchFunctionDescriptor` classes |
| `__init__.py` | Exported `AsyncBatchFunction` |
| `flink-fn-execution.proto` | Added `ASYNC_BATCH` function type |
### Test Files
| File | Description |
|------|-------------|
| `test_async_batch_function.py` | Comprehensive tests for batch async
functionality |
## API Design
### AsyncBatchFunction
```python
class AsyncBatchFunction(Function, Generic[IN, OUT]):
"""
A function to trigger Async I/O operation with batch processing support.
Designed for AI/ML inference scenarios where batching improves
throughput.
"""
@abstractmethod
async def async_invoke_batch(self, inputs: List[IN]) -> List[OUT]:
"""
Trigger async operation for a batch of stream inputs.
Returns a list of results, one for each input element.
"""
pass
def timeout_batch(self, inputs: List[IN]) -> List[OUT]:
"""
Called when async_invoke_batch times out.
Override to provide custom timeout handling.
"""
raise TimeoutError("Async batch function call has timed out")
```
### AsyncDataStream Methods
```python
# Unordered batch execution
AsyncDataStream.unordered_wait_batch(
data_stream,
async_batch_function,
timeout, # Overall timeout
batch_size, # Max elements per batch
batch_timeout=None,# Optional batch flush timeout
capacity=100, # Max in-flight operations
output_type=None # Output type info
)
# Ordered batch execution (preserves input order)
AsyncDataStream.ordered_wait_batch(
data_stream,
async_batch_function,
timeout,
batch_size,
batch_timeout=None,
capacity=100,
output_type=None
)
```
## Example Usage
```python
from pyflink.datastream import AsyncDataStream, AsyncBatchFunction
from pyflink.common import Time, Types, Row
from typing import List
class MLInferenceFunction(AsyncBatchFunction):
"""Batch ML model inference function."""
async def async_invoke_batch(self, inputs: List[Row]) -> List[float]:
# Batch inference call to ML model
features = [self.extract_features(row) for row in inputs]
predictions = await self.model.predict_batch(features)
return predictions.tolist()
# Apply to data stream
result = AsyncDataStream.unordered_wait_batch(
ds,
MLInferenceFunction(),
timeout=Time.seconds(30),
batch_size=32,
batch_timeout=Time.milliseconds(100),
output_type=Types.FLOAT()
)
```
## Testing
The PR includes comprehensive tests covering:
1. **Basic batch execution** - Verify batching and results
2. **Batch size triggering** - Verify batches trigger at configured size
3. **Batch timeout triggering** - Verify partial batches flush on timeout
4. **Exception propagation** - Verify errors fail the job
5. **Timeout handling** - Verify `timeout_batch` is called
6. **Ordered execution** - Verify output order matches input
7. **End-of-input flush** - Verify remaining elements are flushed
8. **Validation errors** - Verify parameter validation
## Design Principles
1. **Reuse Java Runtime** - All batching logic is in `AsyncBatchWaitOperator`
2. **Follow Existing Patterns** - API mirrors `AsyncFunction` integration
3. **Explicit, Readable Code** - No complex abstractions
4. **Backward Compatible** - Existing APIs unchanged
## Verifying this change
This change added tests and can be verified as follows:
```bash
cd flink-python
python -m pytest pyflink/datastream/tests/test_async_batch_function.py -v
```
## Does this pull request potentially affect one of the following parts
- Dependencies (does it add or upgrade a dependency): **no**
- The public API, i.e., is any changed class annotated with
`@Public(Evolving)`: **yes**
- The serializers: **no**
- The runtime per-record code paths (performance sensitive): **no**
- Anything that affects deployment or recovery: **no**
- The S3 file system connector: **no**
## Documentation
- Does this pull request introduce a new feature? **yes**
- If yes, how is the feature documented? **Docstrings**
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]