[ 
https://issues.apache.org/jira/browse/FLINK-39074?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-39074:
-----------------------------------
    Labels: pull-request-available  (was: )

> Add built-in AI model inference capability to PyFlink with automatic 
> lifecycle management
> -----------------------------------------------------------------------------------------
>
>                 Key: FLINK-39074
>                 URL: https://issues.apache.org/jira/browse/FLINK-39074
>             Project: Flink
>          Issue Type: New Feature
>          Components: API / Python
>            Reporter: featzhang
>            Priority: Major
>              Labels: pull-request-available
>
> PyFlink currently lacks native support for AI model inference, forcing users 
> to manually manage models, batching, and resources. This proposal introduces 
> a new DataStream.infer() API that provides out-of-the-box AI inference 
> capabilities with automatic lifecycle management.
> h2. 
> MOTIVATION
> Current Pain Points:
> • Users must manually load/unload models
> • No built-in batching for inference optimization
> • No standardized resource management
> • Lack of warmup and performance optimization strategies
> User Impact:
> • Complicated boilerplate code for inference
> • Suboptimal performance due to lack of batching
> • Resource leaks from improper model management
> h2. 
> PROPOSED API
> Text Embedding Example:
> {code:python}
> from pyflink.datastream import StreamExecutionEnvironment
> env = StreamExecutionEnvironment.get_execution_environment()
> result = data_stream.infer(
>     model="sentence-transformers/all-MiniLM-L6-v2",
>     input_col="text",
>     output_col="embedding"
> )
> {code}
> Sentiment Classification Example:
> {code:python}
> sentiment = data_stream.infer(
>     model="distilbert-base-uncased-finetuned-sst-2-english",
>     input_col="text",
>     output_col="sentiment",
>     task_type="classification"
> )
> {code}
> h2. 
> ARCHITECTURE
> {noformat}
> DataStream.infer()
>     ↓
> InferenceFunction (MapFunction)
>     ↓
> ModelLifecycleManager
>     ├── Model Loading (HuggingFace/Local)
>     ├── Model Warmup
>     └── Resource Management
>     ↓
> BatchInferenceExecutor
>     ├── Tokenization
>     ├── Batch Inference
>     └── Result Extraction
>     ↓
> InferenceMetrics
> {noformat}
> h2. 
> KEY FEATURES
> Model Lifecycle Management:
> • Automatic model loading from HuggingFace Hub or local path
> • Model warmup for optimal performance
> • Proper cleanup and resource deallocation
> Batch Inference:
> • Configurable batch size
> • Batch timeout control
> • Future: Integration with AsyncBatchFunction (FLINK-38825)
> Multi-Task Support:
> • Text embedding
> • Text classification
> • Text generation
> Resource Optimization:
> • CPU/GPU device selection
> • FP16 precision support
> • CUDA memory management
> Metrics & Monitoring:
> • Inference latency (avg/p50/p95/p99)
> • Throughput tracking
> • Error rate monitoring
> h2. 
> CONFIGURATION OPTIONS
> |Parameter|Type|Default|Description|
> |model|string|(required)|Model name (HuggingFace) or local path|
> |input_col|string|(required)|Input column name|
> |output_col|string|(required)|Output column name|
> |batch_size|int|32|Batch size for inference|
> |max_batch_timeout_ms|int|100|Max batch wait time in milliseconds|
> |model_warmup|bool|true|Enable model warmup|
> |device|string|"cpu"|Device: cpu, cuda:0, etc.|
> |num_workers|int|1|Number of worker processes|
> |task_type|string|"embedding"|Task type: embedding/classification/generation|



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to