Hi dev, I'd like to start a vote on FLIP-466.
Discussion thread: https://lists.apache.org/thread/sw2or62299w0hw9jm5kdqjqj3j8rnrdt FLIP: https://cwiki.apache.org/confluence/display/FLINK/FLIP-466%3A+Introduce+ProcessFunction+Attribute+in+DataStream+API+V2 Best regards, Wencong Liu 在 2024-07-15 10:53:00,"Xuannan Su" <suxuanna...@gmail.com> 写道: >Hi, Wencong, >Thanks for driving the FLIP. > >+1 for this FLIP. > >It is a useful mechanism to let the operator provide the hint for the >engine to optimize the execution. > >Best regards, >Xuannan > >On Sat, Jul 6, 2024 at 3:53 PM Wencong Liu <liuwencle...@163.com> wrote: >> >> Hi Yuxin, >> Thanks for the reply. >> > For idempotence annotation, what's the specific behavior? >> >> >> StreamTask will reduce the frequency of output record, which will >> have a default value and can also be set through configuration options. >> The specific rules will be described in detail in the subsequent FLIP. >> >> >> Best, >> Wencong >> >> >> >> >> >> >> >> >> >> >> >> >> >> 在 2024-07-05 14:19:44,"Yuxin Tan" <tanyuxinw...@gmail.com> 写道: >> >Hi, Wencong, >> >Thanks for driving the FLIP. >> > >> >+1 for this FLIP. >> > >> >I believe these hints will improve the performance in many use cases. >> >I only have a minor question about the Idempotence annotation. When >> >this annotation is added, how does StreamTask optimize the frequency? >> >Does it ensure a single output, or does it merely reduce the frequency >> >of the outputs? >> > >> >Best, >> >Yuxin >> > >> > >> >Wencong Liu <liuwencle...@163.com> 于2024年7月1日周一 16:39写道: >> > >> >> Hi, Jeyhun, >> >> Thanks for the reply. >> >> >> >> >> >> > Integrate these annotations with the execution plan. >> >> I believe DataStream is an Imperative API, which means >> >> that the actual execution plan is basically consistent with >> >> the computational logic expressed by the user with DataStream, >> >> and it is different from SQL, so the significance of supporting >> >> getExecutionPlan in the short term may not be great. If it is to >> >> be supported later, it is possible to consider the impact of Hints. >> >> >> >> >> >> > Check for misuse of attributes or ignore it. >> >> For illegal use (annotated on the inappropriate ProcessFunction), >> >> an exception will be thrown. For legal use, the framework can also >> >> choose to ignore it. >> >> >> >> >> >> > A framework to include attributes. >> >> Yes, we will provide a basic framework in the implementation >> >> to help developers for extension. >> >> >> >> >> >> Best, >> >> Wencong >> >> >> >> >> >> At 2024-06-28 02:06:37, "Jeyhun Karimov" <je.kari...@gmail.com> wrote: >> >> >Hi Wencong, >> >> > >> >> >Thanks for the FLIP. +1 for it. >> >> > >> >> >Providing hints to users will enable more optimization potential for >> >> >DSv2. >> >> >I have a few questions. >> >> > >> >> >I think currently, DSv2 ExecutionEnvironment does not support getting >> >> >execution plan (getExecutionPlan()). >> >> >Do you plan to integrate these annotations with the execution plan? >> >> > >> >> >Any plans to check for misuse of attributes? Or any plans for a framework >> >> >to implicitly include attributes? >> >> > >> >> >Also, now that we make analogy with SQL hints, SQL query planners usually >> >> >ignore wrong hints and continue with its best plan. >> >> >Do we want to consider this approach? Or should we throw exception >> >> whenever >> >> >the hint (attribute in this case) is wrong? >> >> > >> >> > >> >> >Regards, >> >> >Jeyhun >> >> > >> >> > >> >> >On Thu, Jun 27, 2024 at 7:47 AM Xintong Song <tonysong...@gmail.com> >> >> wrote: >> >> > >> >> >> +1 for this FLIP. >> >> >> >> >> >> I think this is similar to SQL hints, where users can provide optional >> >> >> information to help the engine execute the workload more efficiently. >> >> >> Having a unified mechanism for such kind of hints should improve >> >> usability >> >> >> compared to introducing tons of configuration knobs. >> >> >> >> >> >> Best, >> >> >> >> >> >> Xintong >> >> >> >> >> >> >> >> >> >> >> >> On Wed, Jun 26, 2024 at 8:09 PM Wencong Liu <liuwencle...@163.com> >> >> wrote: >> >> >> >> >> >> > Hi devs, >> >> >> > >> >> >> > >> >> >> > I'm proposing a new FLIP[1] to introduce the ProcessFunction >> >> Attribute in >> >> >> > the >> >> >> > DataStream API V2. The goal is to optimize job execution by >> >> >> > leveraging >> >> >> > additional information about users' ProcessFunction logic. The >> >> proposal >> >> >> > includes >> >> >> > the following scenarios where the ProcessFunction Attribute can >> >> >> > significantly >> >> >> > enhance optimization: >> >> >> > >> >> >> > >> >> >> > Scenario 1: If the framework recognizes that the ProcessFunction >> >> outputs >> >> >> > data >> >> >> > only after all input is received, the downstream operators can be >> >> >> > scheduled until >> >> >> > the ProcessFunction is finished, which effectively reduces resource >> >> >> > consumption. >> >> >> > Ignoring this information could lead to premature scheduling of >> >> >> downstream >> >> >> > operators with no data to process. This scenario is addressed and >> >> >> > optimized by FLIP-331[2]. >> >> >> > >> >> >> > >> >> >> > Scenario 2: For stream processing, where users are only interested in >> >> the >> >> >> > latest >> >> >> > result per key at the current time, the framework can optimize by >> >> >> > adjusting the >> >> >> > frequency of ProcessFunction outputs. This reduces shuffle data >> >> >> > volume >> >> >> and >> >> >> > downstream operator workload. If this optimization is ignored, each >> >> new >> >> >> > input >> >> >> > would trigger a new output. This scenario is addressed and >> >> >> > optimized by FLIP-365[3]. >> >> >> > >> >> >> > >> >> >> > Scenario 3: If a user's ProcessFunction neither caches inputs nor >> >> >> outputs, >> >> >> > recognizing this can enable object reuse for this data within the >> >> >> > OperatorChain, >> >> >> > enhancing performance. Without this optimization, data would be >> >> >> > copied >> >> >> > before >> >> >> > being passed to the next operator. This scenario is addressed and >> >> >> > optimized by FLIP-329[4]. >> >> >> > >> >> >> > >> >> >> > To unify the mechanism for utilizing additional information and >> >> >> optimizing >> >> >> > jobs, >> >> >> > we propose introducing the ProcessFunction Attribute represented by >> >> >> > Java annotations, which allow users to provide relevant information >> >> about >> >> >> > their >> >> >> > ProcessFunctions. The framework can then use this to optimize job >> >> >> > execution. >> >> >> > Importantly, regular job execution remains unaffected whether users >> >> use >> >> >> > this >> >> >> > attribute or not. >> >> >> > >> >> >> > >> >> >> > Looking forward to discussing this in the upcoming FLIP. >> >> >> > >> >> >> > >> >> >> > Best regards, >> >> >> > Wencong Liu >> >> >> > >> >> >> > >> >> >> > [1] >> >> >> > >> >> >> >> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-466%3A+Introduce+ProcessFunction+Attribute+in+DataStream+API+V2 >> >> >> > [2] >> >> >> > >> >> >> >> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-331%3A+Support+EndOfStreamTrigger+and+isOutputOnlyAfterEndOfStream+operator+attribute+to+optimize+task+deployment >> >> >> > [3] >> >> >> > >> >> >> >> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-365%3A+Introduce+flush+interval+to+adjust+the+interval+of+emitting+results+with+idempotent+semantics >> >> >> > [4] >> >> >> > >> >> >> >> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-329%3A+Add+operator+attribute+to+specify+support+for+object-reuse >> >> >> >> >>