Re: [DISCUSS] Support scalar vectorized Python UDF in PyFlink

2020-02-11 Thread Dian Fu
Hi all, Thanks you all participating this discussion and sharing your thoughts. It seems that we have reached consensus on the design now. I will start a VOTE thread if there are no other feedbacks. Thanks, Dian On Tue, Feb 11, 2020 at 10:23 AM Dian Fu wrote: > Hi Jingsong, > > You're right. I

Re: [DISCUSS] Support scalar vectorized Python UDF in PyFlink

2020-02-10 Thread Dian Fu
Hi Jingsong, You're right. I have updated the FLIP which reflects this. Thanks, Dian > 在 2020年2月11日,上午10:03,Jingsong Li 写道: > > Hi Dian and Jincheng, > > Thanks for your explanation. Think again. Maybe most of users don't want to > modify this parameters. > We all realize that "batch.size" s

Re: [DISCUSS] Support scalar vectorized Python UDF in PyFlink

2020-02-10 Thread Jingsong Li
Hi Dian and Jincheng, Thanks for your explanation. Think again. Maybe most of users don't want to modify this parameters. We all realize that "batch.size" should be a larger value, so "bundle.size" must also be increased. Now the default value of "bundle.size" is only 1000. I think you can update

Re: [DISCUSS] Support scalar vectorized Python UDF in PyFlink

2020-02-10 Thread Dian Fu
Hi Jincheng, Hequn & Jingsong, Thanks a lot for your suggestions. I have created FLIP-97[1] for this feature. > One little suggestion: maybe it would be nice if we can add some performance explanation in the document? (I just very curious:)) Thanks for the suggestion. I have updated the design do

Re: [DISCUSS] Support scalar vectorized Python UDF in PyFlink

2020-02-10 Thread jincheng sun
Hi Jingsong, Thanks for your feedback! I would like to share my thoughts regarding the follows question: >> - Can we only configure one parameter and calculate another automatically? For example, if we just want to "pipeline", "bundle.size" is twice as much as "batch.size", is this work? I don't

Re: [DISCUSS] Support scalar vectorized Python UDF in PyFlink

2020-02-09 Thread Jingsong Li
Thanks Dian for your reply. +1 to create a FLIP too. About "python.fn-execution.bundle.size" and "python.fn-execution.arrow.batch.size", I got what are you mean about "pipeline". I agree. It seems that a batch should always in a bundle. Bundle size should always bigger than batch size. (if a batc

Re: [DISCUSS] Support scalar vectorized Python UDF in PyFlink

2020-02-09 Thread Hequn Cheng
Hi Dian, Thanks a lot for bringing up the discussion! It is great to see the Pandas UDFs feature is going to be introduced. I think this would improve the performance and also the usability of user-defined functions (UDFs) in Python. One little suggestion: maybe it would be nice if we can add som

Re: [DISCUSS] Support scalar vectorized Python UDF in PyFlink

2020-02-09 Thread jincheng sun
Hi Dian, Thanks for bring up this discussion. This is very important for the ecological of PyFlink. Add support Pandas greatly enriches the available UDF library of PyFlink and greatly improves the usability of PyFlink! +1 for Support scalar vectorized Python UDF. I think we should to create a F

Re: [DISCUSS] Support scalar vectorized Python UDF in PyFlink

2020-02-05 Thread dianfu
Hi Jingsong, Thanks a lot for the valuable feedback. 1. The configurations "python.fn-execution.bundle.size" and "python.fn-execution.arrow.batch.size" are used for separate purposes and I think they are both needed. If they are unified, the Python operator has to wait the execution results of

Re: [DISCUSS] Support scalar vectorized Python UDF in PyFlink

2020-02-05 Thread Jingsong Li
Hi Dian, +1 for this, thanks driving. Documentation looks very good. I can imagine a huge performance improvement and better integration to other Python libraries. A few thoughts: - About data split: "python.fn-execution.arrow.batch.size", can we unify it with "python.fn-execution.bundle.size"? -

[DISCUSS] Support scalar vectorized Python UDF in PyFlink

2020-02-04 Thread dianfu
Hi all, Scalar Python UDF has already been supported in the coming release 1.10 (FLIP-58[1]). It operates one row at a time. It works in the way that the Java operator serializes one input row to bytes and sends them to the Python worker; the Python worker deserializes the input row and evaluat