Hi Geng,

Thanks for the details.

I just focussed on Thread Mode for the first POC. Idea was to extend to
process mode later , after the overall design was approved.
I will go through your impl, but from my previous checks, i saw there were
some limitations around sending context specific to each
userDefinedFunction in process mode
as proto carries  *PythonFunctionInfo[] userDefinedFunctions *and single
runtime context shared across all of them.
We would need a separate model config passed for each of the functions.
Proto could be extended , but i haven't drilled into it yet.


>> I notice that your POC concentrates on the Thread Mode instead of the
Process Mode.

On Wed, Dec 10, 2025 at 6:38 PM Geng Biao <[email protected]> wrote:

> Hi Swapna,
> One more follow-up about the `PredictRuntimeContext`: I notice that your
> POC concentrates on the Thread Mode instead of the Process Mode. I'm not
> sure if the reason is that directly calling python methods like
> `set_model_config` in Thread Mode is more convenient, but I should mention
> that in Process Mode, it might not be ideal to directly call a Python
> method from the Java side. If it's just for passing model configurations,
> perhaps we can refer to my current implementation <
> https://github.com/bgeng777/flink/blob/b029aaca9163a4691d8ce4fcfc9a1a4d6cc67527/flink-python/src/main/java/org/apache/flink/python/util/ProtoUtils.java#L161>
> and reuse the _job_parameters in FunctionContext. This way, we might be
> able to directly reuse the existing UDF's `open` method with
> FunctionContext.
>
> Best,
> Biao Geng
>
> > 2025年12月10日 01:11,Swapna Marru <[email protected]> 写道:
> >
> > Hi
> >
> > Thanks Geng Biao for the interest and the poc implementation.   Sorry I
> was
> > away for a while and in between some other stuff currently.
> > I will be able to look back on this starting next week.
> > Meanwhile, would like to share my previous POC impl,
> >
> https://github.com/apache/flink/commit/bb2d5c4aa096a4eb3a89adb6a5bfecd8e87d3262
> >
> >
> > I will take a look at your implementation.
> > In previous discussions with ShengKai , one more thing we wanted to
> explore
> > is the flexibility of having the model provider factory also in Python ,
> > instead of having it in Java as proposed in FLIP.
> > This is something, I haven't yet gotten time to dig deeper.
> >
> > Yes i agree on this.
> >>> I notice that the parameter `model-directory-path` is introduced. It
> > should work but I see that some popular python model libraries like
> > `transformer` and `vllm`, tend to directly use the parameter name `model`
> > which could be a local path or a simple name like `Qwen/Qwen3-0.6B`. Then
> > they check if `model` is a path or a simple name and automatically
> download
> > it to local cache directory if necessary. Considering use cases  based
> > these libraries, maybe  `model-directory-path` can be renamed to `model`
> > which can be either a real path or a simple model name.
> >
> > Yes , PredictRuntimeContext is more generic and is extendible. I will
> > update the FLIP soon.
> >>> There is a `set_model_config`  in the python PredictFunction interface.
> > I find that in the previous discussion, you are considering passing
> > PredictRuntimeContext in `open`. I do agree with this choice as well,
> which
> > makes the logic more clear and uses can just use the configs in the
> `open`
> > method to init their model. I just find that the FLIP still has the
> > `set_model_config` so just want to double check it here.
> >
> > -Thanks,
> > M.Swapna
> >
> >
> > On Tue, Dec 2, 2025 at 4:48 AM Geng Biao <[email protected] <mailto:
> [email protected]>> wrote:
> >
> >> Hi folks,
> >>
> >> I am writing to follow up with this FLIP. I recently implemented a PoC
> for
> >> the ML_PREDICT using local python model based on current PyFlink’s udtf
> >> framework.
> >> Codes can be found here:
> >>
> https://github.com/bgeng777/flink/commit/8eb002944ee8ce863680923ff53622eb10813777
> >> I believe the proposal can work and want to share some findings:
> >> My implementation follows most SQL section design of the FLIP (only
> >> difference is that I use ‘model’ option instead of
> >> 'model-directory-path’(see my previous reply of the thread for details),
> >> which I think is expressive enough.
> >>
> >> In the python part, as the ML_PREDICT is a specialized UDTF and PyFlink
> >> also supports UDTF, it is natural to directly reuse the udtf <
> >>
> https://github.com/apache/flink/blob/master/flink-python/pyflink/table/udf.py#L692
> >
> >> decorator to define a python predict class. One point is that as the SQL
> >> primitive has defined OUTPUT schema, we may need to enhance the
> PyFlink’s
> >> udtf interface to allow users to use it without specifying
> `result_types`
> >> and directly use the definition from the DDL. Another point is that we
> need
> >> to pass some model parameters(e.g. temperature, GPU/CPU settings) to
> Python
> >> process so we may need to enhance PyFlink to support such usage so that
> in
> >> the UDTF, users can directly get the model parameters. After introducing
> >> such improvements, we can define the python logic like this:
> >>
> >> ```
> >> class HuggingFaceFunc(TableFunction):
> >>    def __init__(self):
> >>        self.model = None
> >>        self.tokenizer = None
> >>        self.pipeline = None
> >>
> >>    def open(self, runtime_context):
> >>        model_dir = runtime_context.get_job_parameter("model", None)
> >>        ...
> >>        self.tokenizer = AutoTokenizer.from_pretrained(
> >>            model_dir, trust_remote_code=True
> >>        )
> >>
> >>        self.model = AutoModelForCausalLM.from_pretrained(model_dir,
> >> device_map=device)
> >>        self.pipeline = pipeline(
> >>            "text-generation", model=self.model, tokenizer=self.tokenizer
> >>        )
> >>        assert self.model is not None
> >>
> >>    def eval(self, content: str, comment: str):
> >>        output = self.pipeline(content)[0]["generated_text"]
> >>        yield output, len(output)
> >>
> >>
> >> HuggingFaceModelUDTF = udtf(HuggingFaceFunc())
> >> ```
> >>
> >> I have implemented 2 python predict class, one uses HuggingFace and
> >> another one is based on vLLM to verify this design(codes can be found
> here <
> >>
> https://github.com/bgeng777/flink/blob/bgeng777/python-ml-predict-poc/flink-end-to-end-tests/flink-python-test/python/huggingface_udtf.py
> >).
> >> So to conclude, I am wondering if we really need a new Python `class
> >> PredictFunction(TableFunction)`.
> >>
> >> Thanks for reading and looking forward to your thoughts.
> >>
> >> Best,
> >> Biao Geng
> >>
> >>
> >>> 2025年11月21日 19:28,Geng Biao <[email protected]> 写道:
> >>>
> >>> Hi Swapna,
> >>>
> >>> Thanks for the FLIP and sorry for the late reply. After reading it, I
> >> have some comments about the proposal.
> >>>
> >>> 1. I notice that the parameter `model-directory-path` is introduced. It
> >> should work but I see that some popular python model libraries like
> >> `transformer` and `vllm`, tend to directly use the parameter name
> `model`
> >> which could be a local path or a simple name like `Qwen/Qwen3-0.6B`.
> Then
> >> they check if `model` is a path or a simple name and automatically
> download
> >> it to local cache directory if necessary. Considering use cases  based
> >> these libraries, maybe  `model-directory-path` can be renamed to `model`
> >> which can be either a real path or a simple model name.
> >>>
> >>> 2. There is a `set_model_config`  in the python PredictFunction
> >> interface. I find that in the previous discussion, you are considering
> >> passing PredictRuntimeContext in `open`. I do agree with this choice as
> >> well, which makes the logic more clear and uses can just use the
> configs in
> >> the `open` method to init their model. I just find that the FLIP still
> has
> >> the `set_model_config` so just want to double check it here.
> >>>
> >>>
> >>>
> >>> Thanks for your time!
> >>>
> >>> Best,
> >>> Biao Geng
> >>>
> >>>> 2025年9月22日 23:36,Swapna Marru <[email protected]> 写道:
> >>>>
> >>>> Hi Devs,
> >>>>
> >>>>
> >>>> I am interested in learning more about MODEL, ML_PREDICT, and
> >> ML_EVALUATE
> >>>> functionalities added in the following FLIP.
> >>>>
> >>>>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-437%3A+Support+ML+Models+in+Flink+SQL
> >>>>
> >>>>
> >>>> I see the original FLIP has extensibility to local model providers in
> >> Flink.
> >>>>
> >>>>
> >>>> Is there a way to do pluggable local model providers in Python? Like,
> >> say,
> >>>> generate embeddings using Sentence transformer models running locally
> in
> >>>> Flink.
> >>>>
> >>>>
> >>>> An option could be to introduce a Model Provider factory
> implementation
> >> in
> >>>> Java that internally uses a predict function in Python . But I see
> this
> >>>> puts in a lot of work related to Java to Python
> >> communication/translation
> >>>> inside the provider.
> >>>>
> >>>>
> >>>> Something like PythonRuntimeProvider along with
> PredictRuntimeProvider /
> >>>> AsyncRuntimeProvider which can handle Java -> Python translations out
> of
> >>>> the box would be helpful to de-duplicate that effort.
> >>>>
> >>>>
> >>>> Can you please point to, if there are any discussions related to this
> >>>> already ? Or any other ways to achieve the same? Please share your
> >> thoughts.
> >>>>
> >>>>
> >>>> -Thanks,
> >>>>
> >>>> Swapna Marru
>
>

Reply via email to