Hi Thomas, Thanks for the confirmation. I will now start a vote.
Best, Xingbo Thomas Weise <t...@apache.org> 于2022年1月12日周三 02:20写道: > Hi Xingbo, > > +1 from my side > > Thanks for the clarification. For your use case the parameter size and > therefore serialization overhead was the limiting factor. I have seen > use cases where that is not the concern, because the Python logic > itself is heavy and dwarfs the protocol overhead (for example when > interacting with external systems from the UDF). Hence it is good to > give users options to optimize for their application requirements. > > Cheers, > Thomas > > On Tue, Jan 11, 2022 at 3:44 AM Xingbo Huang <hxbks...@gmail.com> wrote: > > > > Hi everyone, > > > > Thanks to all of you for the discussion. > > If there are no objections, I would like to start a vote thread tomorrow. > > > > Best, > > Xingbo > > > > Xingbo Huang <hxbks...@gmail.com> 于2022年1月7日周五 16:18写道: > > > > > Hi Till, > > > > > > I have written a more complicated PyFlink job. Compared with the > previous > > > single python udf job, there is an extra stage of converting between > table > > > and datastream. Besides, I added a python map function for the job. > Because > > > python datastream has not yet implemented Thread mode, the python map > > > function operator is still running in Process Mode. > > > > > > ``` > > > source = t_env.from_path("source_table") # schema [id: String, d:int] > > > > > > @udf(result_type=DataTypes.STRING(), func_type="general") > > > def upper(x): > > > return x.upper() > > > > > > t_env.create_temporary_system_function("upper", upper) > > > # python map function > > > ds = t_env.to_data_stream(source) \ > > > .map(lambda x: x, output_type=Types.ROW_NAMED(["id", > "d"], > > > > > > [Types.STRING(), > > > > > > Types.INT()])) > > > > > > t = t_env.from_data_stream(ds) > > > t.select('upper(id)').execute_insert('sink_table') > > > ``` > > > > > > The input data size is 1k. > > > > > > Mode | QPS > > > Process Mode | 3w > > > Thread Mode + Process mode | 4w > > > > > > From the table, we can find that the nodes run in Process Mode is the > > > performance bottleneck of the job. > > > > > > Best, > > > Xingbo > > > > > > Till Rohrmann <trohrm...@apache.org> 于2022年1月5日周三 23:16写道: > > > > > >> Thanks for the detailed answer Xingbo. Quick question on the last > figure > > >> in > > >> the FLIP. You said that this is a real world Flink stream SQL job. The > > >> title of the graph says UDF(String Upper). So do I understand > correctly > > >> that string upper is the real world use case you have measured? What I > > >> wanted to ask is how a slightly more complex Flink Python job > (involving > > >> shuffles, with back pressure, etc.) performs using the thread and > process > > >> mode respectively. > > >> > > >> If the mode solely needs changes in the Python part of Flink, then I > don't > > >> have any concerns from the runtime perspective. > > >> > > >> Cheers, > > >> Till > > >> > > >> On Wed, Jan 5, 2022 at 1:55 PM Xingbo Huang <hxbks...@gmail.com> > wrote: > > >> > > >> > Hi Till and Thomas, > > >> > > > >> > Thanks a lot for joining the discussion. > > >> > > > >> > For Till: > > >> > > > >> > >>> Is the slower performance currently the biggest pain point for > our > > >> > Python users? What else are our Python users mainly complaining > about? > > >> > > > >> > PyFlink users are most concerned about two parts, one is better > > >> usability, > > >> > the other is performance. Users often make some benchmarks when they > > >> > investigate pyflink[1][2] at the beginning to decide whether to use > > >> > PyFlink. The performance of a PyFlink job depends on two parts, one > is > > >> the > > >> > overhead of the PyFlink framework, and the other is the Python > function > > >> > complexity implemented by the user. In the Python ecosystem, there > are > > >> many > > >> > libraries and tools that can help Python users improve the > performance > > >> of > > >> > their custom functions, such as pandas[3], numba[4] and cython[5]. > So we > > >> > hope that the framework overhead of PyFlink itself can also be > reduced. > > >> > > > >> > >>> Concerning the proposed changes, are there any changes required > on > > >> the > > >> > runtime side (changes to Flink)? How will the deployment and memory > > >> > management be affected when using the thread execution mode? > > >> > > > >> > The changes on PyFlink Runtime mentioned here are actually only > > >> > modifications of PyFlink custom Operators, such as > > >> > PythonScalarFunctionOperator[6], which won't affect deployment and > > >> memory > > >> > management. > > >> > > > >> > >>> One more question that came to my mind: How much performance > > >> > improvement dowe gain on a real-world Python use case? Were the > > >> > measurements more like micro benchmarks where the Python UDF was > called > > >> w/o > > >> > the overhead of Flink? I would just be curious how much the Python > > >> > component contributes to the overall runtime of a real world job. > Do we > > >> > have some data on this? > > >> > > > >> > The last figure I put in FLIP is the performance comparison of three > > >> real > > >> > Flink Stream Sql Jobs. They are a Java UDF job, a Python UDF job in > > >> Process > > >> > Mode, and a Python UDF job in Thread Mode. The calculated value of > QPS > > >> is > > >> > the end-to-end Flink job execution result. As shown in the > performance > > >> > comparison chart, the performance of Python udf with the same > function > > >> can > > >> > often only reach 20% of Java udf, so the performance of python udf > will > > >> > often become the performance bottleneck in a PyFlink job. > > >> > > > >> > For Thomas: > > >> > > > >> > The first time that I realized the framework overhead of various IPC > > >> > (socket, grpc, shared memory) cannot be ignored in some scenarios is > > >> due to > > >> > an image algorithm prediction job of PyFlink. Its input parameters > are a > > >> > series of huge image binary arrays, and its data size is bigger > than 1G. > > >> > The performance overhead of serialization/deserialization has > become an > > >> > important part of its poor performance. Although this job is a bit > > >> extreme, > > >> > through measurement, we did find the impact of the > > >> > serialization/deserialization overhead caused by larger size > parameters > > >> on > > >> > the performance of the IPC framework. > > >> > > > >> > >>> As I understand it, you measured the difference in throughput > for > > >> UPPER > > >> > between process and embedded mode and the difference is 50% > increased > > >> > throughput? > > >> > > > >> > This 50% is the result when the data size is less than 100byte. > When the > > >> > data size reaches 1k, the performance of the Embedded Mode will > reach > > >> about > > >> > 3.5 times the performance of the Process Mode shown in the FLIP. > When > > >> the > > >> > data reaches 1M, the performance of Embedded Mode can reach 5 times > the > > >> > performance of the Process Mode. The biggest difference here is > that in > > >> > Embedded Mode, input/result data does not need to be > > >> > serialized/deserialized. > > >> > > > >> > >>> Is that a typical UDF in your usage? > > >> > > > >> > The reason for choosing UPPER is that a simpler udf implementation > can > > >> make > > >> > it easier to evaluate the performance of different execution modes. > > >> > > > >> > >>> What do you observe when the function becomes more complex? > > >> > > > >> > We can analyze the QPS of the framework (process mode or embedded > mode) > > >> and > > >> > the QPS of the UDF calculation logic separately. A more complex UDF > > >> means > > >> > that it is a UDF with a smaller QPS. The main factors that affect > the > > >> > framework QPS are data type of parameters, number of parameters and > > >> size of > > >> > parameters, which will greatly affect the > serialization/deserialization > > >> > overhead in Process Mode. > > >> > > > >> > The purpose of introducing thread mode is not to replace Process > mode, > > >> but > > >> > to supplement Python udf usage scenarios such as cep and join, and > some > > >> > scenarios where higher performance is pursued. Compared with Thread > > >> mode, > > >> > Process Mode has better isolation, which can solve the limitation of > > >> thread > > >> > mode in some scenarios such as session mode. > > >> > > > >> > [1] > https://www.mail-archive.com/user@flink.apache.org/msg42760.html > > >> > [2] > https://www.mail-archive.com/user@flink.apache.org/msg44975.html > > >> > [3] https://pandas.pydata.org/ > > >> > [4] https://cython.org/ > > >> > [5] https://numba.pydata.org/ > > >> > [6] > > >> > > > >> > > > >> > https://github.com/apache/flink/blob/master/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/PythonScalarFunctionOperator.java > > >> > > > >> > Best, > > >> > Xingbo > > >> > > > >> > Thomas Weise <t...@apache.org> 于2022年1月4日周二 04:23写道: > > >> > > > >> > > Interesting discussion. It caught my attention because I was also > > >> > > interested in the Beam fn execution overhead a few years ago. > > >> > > > > >> > > We found back then that while in theory the fn protocol overhead > is > > >> > > very significant, for realistic function workloads that overhead > was > > >> > > negligible. And of course it all depends on the use case. It > might be > > >> > > worthwhile to quantify a couple more scenarios. > > >> > > > > >> > > As I understand it, you measured the difference in throughput for > > >> > > UPPER between process and embedded mode and the difference is 50% > > >> > > increased throughput? Is that a typical UDF in your usage? What > do you > > >> > > observe when the function becomes more complex? > > >> > > > > >> > > Thanks, > > >> > > Thomas > > >> > > > > >> > > On Mon, Jan 3, 2022 at 5:52 AM Till Rohrmann < > trohrm...@apache.org> > > >> > wrote: > > >> > > > > > >> > > > One more question that came to my mind: How much performance > > >> > improvement > > >> > > do > > >> > > > we gain on a real-world Python use case? Were the measurements > more > > >> > like > > >> > > > micro benchmarks where the Python UDF was called w/o the > overhead of > > >> > > Flink? > > >> > > > I would just be curious how much the Python component > contributes to > > >> > the > > >> > > > overall runtime of a real world job. Do we have some data on > this? > > >> > > > > > >> > > > Cheers, > > >> > > > Till > > >> > > > > > >> > > > On Mon, Jan 3, 2022 at 11:45 AM Till Rohrmann < > trohrm...@apache.org > > >> > > > >> > > wrote: > > >> > > > > > >> > > > > Hi Xingbo, > > >> > > > > > > >> > > > > Thanks for creating this FLIP. I have two general questions > about > > >> the > > >> > > > > motivation for this FLIP because I have only very little > exposure > > >> to > > >> > > our > > >> > > > > Python users: > > >> > > > > > > >> > > > > Is the slower performance currently the biggest pain point > for our > > >> > > Python > > >> > > > > users? > > >> > > > > > > >> > > > > What else are our Python users mainly complaining about? > > >> > > > > > > >> > > > > Concerning the proposed changes, are there any changes > required on > > >> > the > > >> > > > > runtime side (changes to Flink)? How will the deployment and > > >> memory > > >> > > > > management be affected when using the thread execution mode? > > >> > > > > > > >> > > > > Cheers, > > >> > > > > Till > > >> > > > > > > >> > > > > On Fri, Dec 31, 2021 at 9:46 AM Xingbo Huang < > hxbks...@gmail.com> > > >> > > wrote: > > >> > > > > > > >> > > > >> Hi Wei, > > >> > > > >> > > >> > > > >> Thanks a lot for your feedback. Very good questions! > > >> > > > >> > > >> > > > >> >>> 1. It seems that we dynamically load an embedded Python > and > > >> user > > >> > > > >> dependencies in the TM process. Can they be uninstalled > cleanly > > >> > after > > >> > > the > > >> > > > >> task finished? i.e. Can we use the Thread Mode in session > mode > > >> and > > >> > > Pyflink > > >> > > > >> shell? > > >> > > > >> > > >> > > > >> I mentioned the limitation of this part in FLIP. There is no > > >> problem > > >> > > > >> without changing the python interpreter, but if you need to > > >> change > > >> > the > > >> > > > >> python interpreter, there is really no way to reload the > Python > > >> > > library. > > >> > > > >> The problem is mainly caused by many Python libraries having > an > > >> > > assumption > > >> > > > >> that they own the process alone. > > >> > > > >> > > >> > > > >> >>> 2. Does one TM have only one embedded Python running at > the > > >> same > > >> > > time? > > >> > > > >> If all the Python operator in the TM share the same PVM, will > > >> there > > >> > > be a > > >> > > > >> loss in performance? > > >> > > > >> > > >> > > > >> Your understanding is correct that one TM have only one > embedded > > >> > > Python > > >> > > > >> running at the same time. I guess you are worried about the > > >> > > performance > > >> > > > >> loss of multi threads caused by Python GIL. There is a > one-to-one > > >> > > > >> correspondence between Java worker thread and Python > > >> > subinterpreters. > > >> > > > >> Although the subinterpreters has not yet completely overcome > the > > >> GIL > > >> > > > >> sharing problem(The Python community’s recent plan for a > > >> > > per-interpreter > > >> > > > >> GIL is also under discussion[1]), the performance of > > >> subinterpreters > > >> > > is > > >> > > > >> very close to that of multiprocessing [2]. > > >> > > > >> > > >> > > > >> >>> 3. How do we load the relevant c library if the > > >> > python.executable > > >> > > is > > >> > > > >> provided by users? > > >> > > > >> > > >> > > > >> Once python.executable is provided, PEMJA will dynamically > load > > >> the > > >> > > > >> CPython > > >> > > > >> library (libpython.*so or libpython.*dylib) and pemja.so > > >> installed > > >> > in > > >> > > the > > >> > > > >> python environment. > > >> > > > >> > > >> > > > >> >>> May there be a risk of version conflicts? > > >> > > > >> > > >> > > > >> I understand that this question is actually discussing > whether > > >> C/C++ > > >> > > has a > > >> > > > >> way to solve the problem of relying on different versions of > a > > >> > > library. > > >> > > > >> First of all, we know that if there is only static linking, > there > > >> > > will be > > >> > > > >> no such problem. And I have studied the source code of > > >> CPython[3], > > >> > > and > > >> > > > >> there is no usage of dynamic linking. The rest is the case > where > > >> > > dynamic > > >> > > > >> linking is used in the C library written by the users. There > are > > >> > many > > >> > > ways > > >> > > > >> to solve this problem with dynamic linking, but after all, > this > > >> > > library is > > >> > > > >> written by users, and it is difficult for us to guarantee > that > > >> there > > >> > > will > > >> > > > >> be no conflicts. At this time, Process Mode will be the > choice of > > >> > falk > > >> > > > >> back. > > >> > > > >> > > >> > > > >> [1] > > >> > > > >> > > >> > > > >> > > >> > > > > >> > > > >> > https://mail.python.org/archives/list/python-...@python.org/thread/S5GZZCEREZLA2PEMTVFBCDM52H4JSENR/#RIK75U3ROEHWZL4VENQSQECB4F4GDELV > > >> > > > >> [2] > > >> > > > >> > > >> > > > >> > > >> > > > > >> > > > >> > https://mail.python.org/archives/list/python-...@python.org/thread/PNLBJBNIQDMG2YYGPBCTGOKOAVXRBJWY/#L5OXHXPFONRKLR3W6U46LUSUIBN4FCZQ > > >> > > > >> [3] https://github.com/python/cpython > > >> > > > >> > > >> > > > >> Best, > > >> > > > >> Xingbo > > >> > > > >> > > >> > > > >> Wei Zhong <weizhong0...@gmail.com> 于2021年12月31日周五 11:49写道: > > >> > > > >> > > >> > > > >> > Hi Xingbo, > > >> > > > >> > > > >> > > > >> > Thanks for creating this FLIP. Big +1 for it! > > >> > > > >> > > > >> > > > >> > I have some question about the Thread Mode: > > >> > > > >> > > > >> > > > >> > 1. It seems that we dynamically load an embedded Python and > > >> user > > >> > > > >> > dependencies in the TM process. Can they be uninstalled > cleanly > > >> > > after > > >> > > > >> the > > >> > > > >> > task finished? i.e. Can we use the Thread Mode in session > mode > > >> and > > >> > > > >> Pyflink > > >> > > > >> > shell? > > >> > > > >> > > > >> > > > >> > 2. Does one TM have only one embedded Python running at the > > >> same > > >> > > time? > > >> > > > >> If > > >> > > > >> > all the Python operator in the TM share the same PVM, will > > >> there > > >> > be > > >> > > a > > >> > > > >> loss > > >> > > > >> > in performance? > > >> > > > >> > > > >> > > > >> > 3. How do we load the relevant c library if the > > >> python.executable > > >> > is > > >> > > > >> > provided by users? May there be a risk of version > conflicts? > > >> > > > >> > > > >> > > > >> > Best, > > >> > > > >> > Wei > > >> > > > >> > > > >> > > > >> > > > >> > > > >> > > 2021年12月29日 上午11:56,Xingbo Huang <hxbks...@gmail.com> > 写道: > > >> > > > >> > > > > >> > > > >> > > Hi everyone, > > >> > > > >> > > > > >> > > > >> > > I would like to start a discussion thread on "Support > PyFlink > > >> > > Runtime > > >> > > > >> > > Execution in Thread Mode" > > >> > > > >> > > > > >> > > > >> > > We have provided PyFlink Runtime framework to support > Python > > >> > > > >> user-defined > > >> > > > >> > > functions since Flink 1.10. The PyFlink Runtime > framework is > > >> > > called > > >> > > > >> > Process > > >> > > > >> > > Mode, which depends on an inter-process communication > > >> > architecture > > >> > > > >> based > > >> > > > >> > on > > >> > > > >> > > the Apache Beam Portability framework. Although starting > a > > >> > > dedicated > > >> > > > >> > > process to execute Python user-defined functions could > have > > >> > better > > >> > > > >> > resource > > >> > > > >> > > isolation, it will bring greater resource and performance > > >> > > overhead. > > >> > > > >> > > > > >> > > > >> > > In order to overcome the resource and performance > problems on > > >> > > Process > > >> > > > >> > Mode, > > >> > > > >> > > we will propose a new execution mode which executes > Python > > >> > > > >> user-defined > > >> > > > >> > > functions in the same thread instead of a separate > process. > > >> > > > >> > > > > >> > > > >> > > I have drafted the FLIP-206[1]. Please feel free to > reply to > > >> > this > > >> > > > >> email > > >> > > > >> > > thread. Looking forward to your feedback! > > >> > > > >> > > > > >> > > > >> > > Best, > > >> > > > >> > > Xingbo > > >> > > > >> > > > > >> > > > >> > > [1] > > >> > > > >> > > > > >> > > > >> > > > >> > > > >> > > >> > > > > >> > > > >> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-206%3A+Support+PyFlink+Runtime+Execution+in+Thread+Mode > > >> > > > >> > > > >> > > > >> > > > >> > > > >> > > >> > > > > > > >> > > > > >> > > > >> > > > >