featzhang opened a new pull request, #27361:
URL: https://github.com/apache/flink/pull/27361

   ## What is the purpose of the change
   
   This PR adds **Python DataStream API integration** for the existing Java 
`AsyncBatchWaitOperator` runtime capability, enabling Python-based AI/ML 
inference and external service calls to use **batch-oriented async execution**.
   
   This is a **pure integration PR** - all batching, scheduling, and async 
execution logic is reused from the Java side.
   
   ## Brief change log
   
   ### New Python Classes
   
   | File | Description |
   |------|-------------|
   | `AsyncBatchFunction` | Python async batch function interface |
   | `AsyncBatchFunctionDescriptor` | Descriptor for serialization and 
configuration |
   | `AsyncBatchOperation` | Runtime operation for batch async execution |
   | `BatchResultDistributor` | Distributes batch results to individual 
elements |
   
   ### Modified Files
   
   | File | Changes |
   |------|---------|
   | `async_data_stream.py` | Added `unordered_wait_batch()` and 
`ordered_wait_batch()` methods |
   | `functions.py` | Added `AsyncBatchFunction` and 
`AsyncBatchFunctionDescriptor` classes |
   | `__init__.py` | Exported `AsyncBatchFunction` |
   | `flink-fn-execution.proto` | Added `ASYNC_BATCH` function type |
   
   ### Test Files
   
   | File | Description |
   |------|-------------|
   | `test_async_batch_function.py` | Comprehensive tests for batch async 
functionality |
   
   ## API Design
   
   ### AsyncBatchFunction
   
   ```python
   class AsyncBatchFunction(Function, Generic[IN, OUT]):
       """
       A function to trigger Async I/O operation with batch processing support.
       Designed for AI/ML inference scenarios where batching improves 
throughput.
       """
   
       @abstractmethod
       async def async_invoke_batch(self, inputs: List[IN]) -> List[OUT]:
           """
           Trigger async operation for a batch of stream inputs.
           Returns a list of results, one for each input element.
           """
           pass
   
       def timeout_batch(self, inputs: List[IN]) -> List[OUT]:
           """
           Called when async_invoke_batch times out.
           Override to provide custom timeout handling.
           """
           raise TimeoutError("Async batch function call has timed out")
   ```
   
   ### AsyncDataStream Methods
   
   ```python
   # Unordered batch execution
   AsyncDataStream.unordered_wait_batch(
       data_stream,
       async_batch_function,
       timeout,           # Overall timeout
       batch_size,        # Max elements per batch
       batch_timeout=None,# Optional batch flush timeout
       capacity=100,      # Max in-flight operations
       output_type=None   # Output type info
   )
   
   # Ordered batch execution (preserves input order)
   AsyncDataStream.ordered_wait_batch(
       data_stream,
       async_batch_function,
       timeout,
       batch_size,
       batch_timeout=None,
       capacity=100,
       output_type=None
   )
   ```
   
   ## Example Usage
   
   ```python
   from pyflink.datastream import AsyncDataStream, AsyncBatchFunction
   from pyflink.common import Time, Types, Row
   from typing import List
   
   class MLInferenceFunction(AsyncBatchFunction):
       """Batch ML model inference function."""
       
       async def async_invoke_batch(self, inputs: List[Row]) -> List[float]:
           # Batch inference call to ML model
           features = [self.extract_features(row) for row in inputs]
           predictions = await self.model.predict_batch(features)
           return predictions.tolist()
   
   # Apply to data stream
   result = AsyncDataStream.unordered_wait_batch(
       ds,
       MLInferenceFunction(),
       timeout=Time.seconds(30),
       batch_size=32,
       batch_timeout=Time.milliseconds(100),
       output_type=Types.FLOAT()
   )
   ```
   
   ## Testing
   
   The PR includes comprehensive tests covering:
   
   1. **Basic batch execution** - Verify batching and results
   2. **Batch size triggering** - Verify batches trigger at configured size
   3. **Batch timeout triggering** - Verify partial batches flush on timeout
   4. **Exception propagation** - Verify errors fail the job
   5. **Timeout handling** - Verify `timeout_batch` is called
   6. **Ordered execution** - Verify output order matches input
   7. **End-of-input flush** - Verify remaining elements are flushed
   8. **Validation errors** - Verify parameter validation
   
   ## Design Principles
   
   1. **Reuse Java Runtime** - All batching logic is in `AsyncBatchWaitOperator`
   2. **Follow Existing Patterns** - API mirrors `AsyncFunction` integration
   3. **Explicit, Readable Code** - No complex abstractions
   4. **Backward Compatible** - Existing APIs unchanged
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
   ```bash
   cd flink-python
   python -m pytest pyflink/datastream/tests/test_async_batch_function.py -v
   ```
   
   ## Does this pull request potentially affect one of the following parts
   
   - Dependencies (does it add or upgrade a dependency): **no**
   - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: **yes**
   - The serializers: **no**
   - The runtime per-record code paths (performance sensitive): **no**
   - Anything that affects deployment or recovery: **no**
   - The S3 file system connector: **no**
   
   ## Documentation
   
   - Does this pull request introduce a new feature? **yes**
   - If yes, how is the feature documented? **Docstrings**


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to