Hi Jingsong, You're right. I have updated the FLIP which reflects this.
Thanks, Dian > 在 2020年2月11日,上午10:03,Jingsong Li <jingsongl...@gmail.com> 写道: > > Hi Dian and Jincheng, > > Thanks for your explanation. Think again. Maybe most of users don't want to > modify this parameters. > We all realize that "batch.size" should be a larger value, so "bundle.size" > must also be increased. Now the default value of "bundle.size" is only 1000. > I think you can update design to provide meaningful default value for > "batch.size" and "bundle.size". > > Best, > Jingsong Lee > > On Mon, Feb 10, 2020 at 4:36 PM Dian Fu <dian0511...@gmail.com> wrote: > >> Hi Jincheng, Hequn & Jingsong, >> >> Thanks a lot for your suggestions. I have created FLIP-97[1] for this >> feature. >> >>> One little suggestion: maybe it would be nice if we can add some >> performance explanation in the document? (I just very curious:)) >> Thanks for the suggestion. I have updated the design doc in the >> "BackGround" section about where the performance gains could be got from. >> >>> It seems that a batch should always in a bundle. Bundle size should >> always >> bigger than batch size. (if a batch can not cross bundle). >> Can you explain this relationship to the document? >> I have updated the design doc explaining more about these two >> configurations. >> >>> In the batch world, vectorization batch size is about 1024+. What do you >> think about the default value of "batch"? >> Is there any link about where this value comes from? I have performed a >> simple test for Pandas UDF which performs the simple +1 operation. The >> performance is best when the batch size is set to 5000. I think it depends >> on the data type of each column, the functionality the Pandas UDF does, >> etc. However I agree with you that we could give a meaningful default value >> for the "batch" size which works in most scenarios. >> >>> Can we only configure one parameter and calculate another automatically? >> For example, if we just want to "pipeline", "bundle.size" is twice as much >> as "batch.size", is this work? >> I agree with Jincheng that this is not feasible. I think that giving an >> meaningful default value for the "batch.size" which works in most scenarios >> is enough. What's your thought? >> >> Thanks, >> Dian >> >> [1] >> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-97%3A+Support+Scalar+Vectorized+Python+UDF+in+PyFlink >> >> >> On Mon, Feb 10, 2020 at 4:25 PM jincheng sun <sunjincheng...@gmail.com> >> wrote: >> >>> Hi Jingsong, >>> >>> Thanks for your feedback! I would like to share my thoughts regarding the >>> follows question: >>> >>>>> - Can we only configure one parameter and calculate another >>> automatically? For example, if we just want to "pipeline", "bundle.size" >> is >>> twice as much as "batch.size", is this work? >>> >>> I don't think this works. These two configurations are used for different >>> purposes and there is no direct relationship between them and so I guess >> we >>> cannot infer a configuration from the other configuration. >>> >>> Best, >>> Jincheng >>> >>> >>> Jingsong Li <jingsongl...@gmail.com> 于2020年2月10日周一 下午1:53写道: >>> >>>> Thanks Dian for your reply. >>>> >>>> +1 to create a FLIP too. >>>> >>>> About "python.fn-execution.bundle.size" and >>>> "python.fn-execution.arrow.batch.size", I got what are you mean about >>>> "pipeline". I agree. >>>> It seems that a batch should always in a bundle. Bundle size should >>> always >>>> bigger than batch size. (if a batch can not cross bundle). >>>> Can you explain this relationship to the document? >>>> >>>> I think default value is a very important thing, we can discuss: >>>> - In the batch world, vectorization batch size is about 1024+. What do >>> you >>>> think about the default value of "batch"? >>>> - Can we only configure one parameter and calculate another >>> automatically? >>>> For example, if we just want to "pipeline", "bundle.size" is twice as >>> much >>>> as "batch.size", is this work? >>>> >>>> Best, >>>> Jingsong Lee >>>> >>>> On Mon, Feb 10, 2020 at 11:55 AM Hequn Cheng <he...@apache.org> wrote: >>>> >>>>> Hi Dian, >>>>> >>>>> Thanks a lot for bringing up the discussion! >>>>> >>>>> It is great to see the Pandas UDFs feature is going to be >> introduced. I >>>>> think this would improve the performance and also the usability of >>>>> user-defined functions (UDFs) in Python. >>>>> One little suggestion: maybe it would be nice if we can add some >>>>> performance explanation in the document? (I just very curious:)) >>>>> >>>>> +1 to create a FLIP for this big enhancement. >>>>> >>>>> Best, >>>>> Hequn >>>>> >>>>> On Mon, Feb 10, 2020 at 11:15 AM jincheng sun < >>> sunjincheng...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hi Dian, >>>>>> >>>>>> Thanks for bring up this discussion. This is very important for the >>>>>> ecological of PyFlink. Add support Pandas greatly enriches the >>>> available >>>>>> UDF library of PyFlink and greatly improves the usability of >> PyFlink! >>>>>> >>>>>> +1 for Support scalar vectorized Python UDF. >>>>>> >>>>>> I think we should to create a FLIP for this big enhancements. :) >>>>>> >>>>>> What do you think? >>>>>> >>>>>> Best, >>>>>> Jincheng >>>>>> >>>>>> >>>>>> >>>>>> dianfu <dia...@apache.org> 于2020年2月5日周三 下午6:01写道: >>>>>> >>>>>>> Hi Jingsong, >>>>>>> >>>>>>> Thanks a lot for the valuable feedback. >>>>>>> >>>>>>> 1. The configurations "python.fn-execution.bundle.size" and >>>>>>> "python.fn-execution.arrow.batch.size" are used for separate >>> purposes >>>>>> and I >>>>>>> think they are both needed. If they are unified, the Python >>> operator >>>>> has >>>>>> to >>>>>>> wait the execution results of the previous batch of elements >> before >>>>>>> processing the next batch. This means that the Python UDF >> execution >>>> can >>>>>> not >>>>>>> be pipelined between batches. With separate configuration, there >>> will >>>>> be >>>>>> no >>>>>>> such problems. >>>>>>> 2. It means that the Java operator will convert input elements to >>>> Arrow >>>>>>> memory format and then send them to the Python worker, vice >> verse. >>>>>>> Regarding to the zero-copy benefits provided by Arrow, we can >> gain >>>> them >>>>>>> automatically using Arrow. >>>>>>> 3. Good point! As all the classes of Python module is written in >>> Java >>>>> and >>>>>>> it's not suggested to introduce new Scala classes, so I guess >> it's >>>> not >>>>>> easy >>>>>>> to do so right now. But I think this is definitely a good >>> improvement >>>>> we >>>>>>> can do in the future. >>>>>>> 4. You're right and we will add a series of Arrow ColumnVectors >> for >>>>> each >>>>>>> type supported. >>>>>>> >>>>>>> Thanks, >>>>>>> Dian >>>>>>> >>>>>>>> 在 2020年2月5日,下午4:57,Jingsong Li <jingsongl...@gmail.com> 写道: >>>>>>>> >>>>>>>> Hi Dian, >>>>>>>> >>>>>>>> +1 for this, thanks driving. >>>>>>>> Documentation looks very good. I can imagine a huge performance >>>>>>> improvement >>>>>>>> and better integration to other Python libraries. >>>>>>>> >>>>>>>> A few thoughts: >>>>>>>> - About data split: "python.fn-execution.arrow.batch.size", can >>> we >>>>>> unify >>>>>>> it >>>>>>>> with "python.fn-execution.bundle.size"? >>>>>>>> - Use of Apache Arrow as the exchange format: Do you mean Arrow >>>>> support >>>>>>>> zero-copy between Java and Python? >>>>>>>> - ArrowFieldWriter seems we can implement it by code >> generation. >>>> But >>>>> it >>>>>>> is >>>>>>>> OK to initial version with virtual function call. >>>>>>>> - ColumnarRow for vectorization reading seems that we need >>>> implement >>>>>>>> ArrowColumnVectors. >>>>>>>> >>>>>>>> Best, >>>>>>>> Jingsong Lee >>>>>>>> >>>>>>>> On Wed, Feb 5, 2020 at 12:45 PM dianfu <dia...@apache.org> >>> wrote: >>>>>>>> >>>>>>>>> Hi all, >>>>>>>>> >>>>>>>>> Scalar Python UDF has already been supported in the coming >>> release >>>>>> 1.10 >>>>>>>>> (FLIP-58[1]). It operates one row at a time. It works in the >> way >>>>> that >>>>>>> the >>>>>>>>> Java operator serializes one input row to bytes and sends them >>> to >>>>> the >>>>>>>>> Python worker; the Python worker deserializes the input row >> and >>>>>>> evaluates >>>>>>>>> the Python UDF with it; the result row is serialized and sent >>> back >>>>> to >>>>>>> the >>>>>>>>> Java operator. >>>>>>>>> >>>>>>>>> It suffers from the following problems: >>>>>>>>> 1) High serialization/deserialization overhead >>>>>>>>> 2) It’s difficult to leverage the popular Python libraries >> used >>> by >>>>>> data >>>>>>>>> scientists, such as Pandas, Numpy, etc which provide high >>>>> performance >>>>>>> data >>>>>>>>> structure and functions. >>>>>>>>> >>>>>>>>> Jincheng and I have discussed offline and we want to introduce >>>>>>> vectorized >>>>>>>>> Python UDF to address the above problems. This feature has >> also >>>> been >>>>>>>>> mentioned in the discussion thread about the Python API >> plan[2]. >>>> For >>>>>>>>> vectorized Python UDF, a batch of rows are transferred between >>> JVM >>>>> and >>>>>>>>> Python VM in columnar format. The batch of rows will be >>> converted >>>>> to a >>>>>>>>> collection of Pandas.Series and given to the vectorized Python >>> UDF >>>>>> which >>>>>>>>> could then leverage the popular Python libraries such as >> Pandas, >>>>>> Numpy, >>>>>>> etc >>>>>>>>> for the Python UDF implementation. >>>>>>>>> >>>>>>>>> Please refer the design doc[3] for more details and welcome >> any >>>>>>> feedback. >>>>>>>>> >>>>>>>>> Regards, >>>>>>>>> Dian >>>>>>>>> >>>>>>>>> [1] >>>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-58%3A+Flink+Python+User-Defined+Stateless+Function+for+Table >>>>>>>>> [2] >>>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-What-parts-of-the-Python-API-should-we-focus-on-next-tt36119.html >>>>>>>>> [3] >>>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> https://docs.google.com/document/d/1ls0mt96YV1LWPHfLOh8v7lpgh1KsCNx8B9RrW1ilnoE/edit#heading=h.qivada1u8hwd >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>> -- >>>>>>>> Best, Jingsong Lee >>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>>> >>>> -- >>>> Best, Jingsong Lee >>>> >>> >> > > > -- > Best, Jingsong Lee