Hi folks, I am writing to follow up with this FLIP. I recently implemented a PoC for the ML_PREDICT using local python model based on current PyFlink’s udtf framework. Codes can be found here: https://github.com/bgeng777/flink/commit/8eb002944ee8ce863680923ff53622eb10813777 I believe the proposal can work and want to share some findings: My implementation follows most SQL section design of the FLIP (only difference is that I use ‘model’ option instead of 'model-directory-path’(see my previous reply of the thread for details), which I think is expressive enough.
In the python part, as the ML_PREDICT is a specialized UDTF and PyFlink also supports UDTF, it is natural to directly reuse the udtf <https://github.com/apache/flink/blob/master/flink-python/pyflink/table/udf.py#L692> decorator to define a python predict class. One point is that as the SQL primitive has defined OUTPUT schema, we may need to enhance the PyFlink’s udtf interface to allow users to use it without specifying `result_types` and directly use the definition from the DDL. Another point is that we need to pass some model parameters(e.g. temperature, GPU/CPU settings) to Python process so we may need to enhance PyFlink to support such usage so that in the UDTF, users can directly get the model parameters. After introducing such improvements, we can define the python logic like this: ``` class HuggingFaceFunc(TableFunction): def __init__(self): self.model = None self.tokenizer = None self.pipeline = None def open(self, runtime_context): model_dir = runtime_context.get_job_parameter("model", None) ... self.tokenizer = AutoTokenizer.from_pretrained( model_dir, trust_remote_code=True ) self.model = AutoModelForCausalLM.from_pretrained(model_dir, device_map=device) self.pipeline = pipeline( "text-generation", model=self.model, tokenizer=self.tokenizer ) assert self.model is not None def eval(self, content: str, comment: str): output = self.pipeline(content)[0]["generated_text"] yield output, len(output) HuggingFaceModelUDTF = udtf(HuggingFaceFunc()) ``` I have implemented 2 python predict class, one uses HuggingFace and another one is based on vLLM to verify this design(codes can be found here <https://github.com/bgeng777/flink/blob/bgeng777/python-ml-predict-poc/flink-end-to-end-tests/flink-python-test/python/huggingface_udtf.py>). So to conclude, I am wondering if we really need a new Python `class PredictFunction(TableFunction)`. Thanks for reading and looking forward to your thoughts. Best, Biao Geng > 2025年11月21日 19:28,Geng Biao <[email protected]> 写道: > > Hi Swapna, > > Thanks for the FLIP and sorry for the late reply. After reading it, I have > some comments about the proposal. > > 1. I notice that the parameter `model-directory-path` is introduced. It > should work but I see that some popular python model libraries like > `transformer` and `vllm`, tend to directly use the parameter name `model` > which could be a local path or a simple name like `Qwen/Qwen3-0.6B`. Then > they check if `model` is a path or a simple name and automatically download > it to local cache directory if necessary. Considering use cases based these > libraries, maybe `model-directory-path` can be renamed to `model` which can > be either a real path or a simple model name. > > 2. There is a `set_model_config` in the python PredictFunction interface. I > find that in the previous discussion, you are considering passing > PredictRuntimeContext in `open`. I do agree with this choice as well, which > makes the logic more clear and uses can just use the configs in the `open` > method to init their model. I just find that the FLIP still has the > `set_model_config` so just want to double check it here. > > > > Thanks for your time! > > Best, > Biao Geng > >> 2025年9月22日 23:36,Swapna Marru <[email protected]> 写道: >> >> Hi Devs, >> >> >> I am interested in learning more about MODEL, ML_PREDICT, and ML_EVALUATE >> functionalities added in the following FLIP. >> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-437%3A+Support+ML+Models+in+Flink+SQL >> >> >> I see the original FLIP has extensibility to local model providers in Flink. >> >> >> Is there a way to do pluggable local model providers in Python? Like, say, >> generate embeddings using Sentence transformer models running locally in >> Flink. >> >> >> An option could be to introduce a Model Provider factory implementation in >> Java that internally uses a predict function in Python . But I see this >> puts in a lot of work related to Java to Python communication/translation >> inside the provider. >> >> >> Something like PythonRuntimeProvider along with PredictRuntimeProvider / >> AsyncRuntimeProvider which can handle Java -> Python translations out of >> the box would be helpful to de-duplicate that effort. >> >> >> Can you please point to, if there are any discussions related to this >> already ? Or any other ways to achieve the same? Please share your thoughts. >> >> >> -Thanks, >> >> Swapna Marru >
