Hi folks,

I am writing to follow up with this FLIP. I recently implemented a PoC for the 
ML_PREDICT using local python model based on current PyFlink’s udtf framework.
Codes can be found here: 
https://github.com/bgeng777/flink/commit/8eb002944ee8ce863680923ff53622eb10813777
I believe the proposal can work and want to share some findings:
My implementation follows most SQL section design of the FLIP (only difference 
is that I use ‘model’ option instead of  'model-directory-path’(see my previous 
reply of the thread for details), which I think is expressive enough.

In the python part, as the ML_PREDICT is a specialized UDTF and PyFlink also 
supports UDTF, it is natural to directly reuse the udtf 
<https://github.com/apache/flink/blob/master/flink-python/pyflink/table/udf.py#L692>
 decorator to define a python predict class. One point is that as the SQL 
primitive has defined OUTPUT schema, we may need to enhance the PyFlink’s udtf 
interface to allow users to use it without specifying `result_types` and 
directly use the definition from the DDL. Another point is that we need to pass 
some model parameters(e.g. temperature, GPU/CPU settings) to Python process so 
we may need to enhance PyFlink to support such usage so that in the UDTF, users 
can directly get the model parameters. After introducing such improvements, we 
can define the python logic like this:

```
class HuggingFaceFunc(TableFunction):
    def __init__(self):
        self.model = None
        self.tokenizer = None
        self.pipeline = None

    def open(self, runtime_context):
        model_dir = runtime_context.get_job_parameter("model", None)
        ...
        self.tokenizer = AutoTokenizer.from_pretrained(
            model_dir, trust_remote_code=True
        )

        self.model = AutoModelForCausalLM.from_pretrained(model_dir, 
device_map=device)
        self.pipeline = pipeline(
            "text-generation", model=self.model, tokenizer=self.tokenizer
        )
        assert self.model is not None

    def eval(self, content: str, comment: str):
        output = self.pipeline(content)[0]["generated_text"]
        yield output, len(output)


HuggingFaceModelUDTF = udtf(HuggingFaceFunc())
```

I have implemented 2 python predict class, one uses HuggingFace and another one 
is based on vLLM to verify this design(codes can be found here 
<https://github.com/bgeng777/flink/blob/bgeng777/python-ml-predict-poc/flink-end-to-end-tests/flink-python-test/python/huggingface_udtf.py>).
 So to conclude, I am wondering if we really need a new Python `class 
PredictFunction(TableFunction)`.

Thanks for reading and looking forward to your thoughts.

Best,
Biao Geng


> 2025年11月21日 19:28,Geng Biao <[email protected]> 写道:
> 
> Hi Swapna,
> 
> Thanks for the FLIP and sorry for the late reply. After reading it, I have 
> some comments about the proposal.
> 
> 1. I notice that the parameter `model-directory-path` is introduced. It 
> should work but I see that some popular python model libraries like 
> `transformer` and `vllm`, tend to directly use the parameter name `model` 
> which could be a local path or a simple name like `Qwen/Qwen3-0.6B`. Then 
> they check if `model` is a path or a simple name and automatically download 
> it to local cache directory if necessary. Considering use cases  based these 
> libraries, maybe  `model-directory-path` can be renamed to `model` which can 
> be either a real path or a simple model name.
> 
> 2. There is a `set_model_config`  in the python PredictFunction interface. I 
> find that in the previous discussion, you are considering passing 
> PredictRuntimeContext in `open`. I do agree with this choice as well, which 
> makes the logic more clear and uses can just use the configs in the `open` 
> method to init their model. I just find that the FLIP still has the 
> `set_model_config` so just want to double check it here.
> 
> 
> 
> Thanks for your time!
> 
> Best,
> Biao Geng
> 
>> 2025年9月22日 23:36,Swapna Marru <[email protected]> 写道:
>> 
>> Hi Devs,
>> 
>> 
>> I am interested in learning more about MODEL, ML_PREDICT, and ML_EVALUATE
>> functionalities added in the following FLIP.
>> 
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-437%3A+Support+ML+Models+in+Flink+SQL
>> 
>> 
>> I see the original FLIP has extensibility to local model providers in Flink.
>> 
>> 
>> Is there a way to do pluggable local model providers in Python? Like, say,
>> generate embeddings using Sentence transformer models running locally in
>> Flink.
>> 
>> 
>> An option could be to introduce a Model Provider factory implementation in
>> Java that internally uses a predict function in Python . But I see this
>> puts in a lot of work related to Java to Python communication/translation
>> inside the provider.
>> 
>> 
>> Something like PythonRuntimeProvider along with PredictRuntimeProvider /
>> AsyncRuntimeProvider which can handle Java -> Python translations out of
>> the box would be helpful to de-duplicate that effort.
>> 
>> 
>> Can you please point to, if there are any discussions related to this
>> already ? Or any other ways to achieve the same? Please share your thoughts.
>> 
>> 
>> -Thanks,
>> 
>> Swapna Marru
> 

Reply via email to