Hi Yuxin,
Thanks for the reply.
> For idempotence annotation, what's the specific behavior?


StreamTask will reduce the frequency of output record, which will
have a default value and can also be set through configuration options.
The specific rules will be described in detail in the subsequent FLIP.


Best,
Wencong













在 2024-07-05 14:19:44,"Yuxin Tan" <tanyuxinw...@gmail.com> 写道:
>Hi, Wencong,
>Thanks for driving the FLIP.
>
>+1 for this FLIP.
>
>I believe these hints will improve the performance in many use cases.
>I only have a minor question about the Idempotence annotation. When
>this annotation is added, how does StreamTask optimize the frequency?
>Does it ensure a single output, or does it merely reduce the frequency
>of the outputs?
>
>Best,
>Yuxin
>
>
>Wencong Liu <liuwencle...@163.com> 于2024年7月1日周一 16:39写道:
>
>> Hi, Jeyhun,
>> Thanks for the reply.
>>
>>
>> > Integrate these annotations with the execution plan.
>> I believe DataStream is an Imperative API, which means
>> that the actual execution plan is basically consistent with
>> the computational logic expressed by the user with DataStream,
>> and it is different from SQL, so the significance of supporting
>> getExecutionPlan in the short term may not be great. If it is to
>> be supported later, it is possible to consider the impact of Hints.
>>
>>
>> > Check for misuse of attributes or ignore it.
>> For illegal use (annotated on the inappropriate ProcessFunction),
>> an exception will be thrown. For legal use, the framework can also
>> choose to ignore it.
>>
>>
>> > A framework to include attributes.
>> Yes, we will provide a basic framework in the implementation
>> to help developers for extension.
>>
>>
>> Best,
>> Wencong
>>
>>
>> At 2024-06-28 02:06:37, "Jeyhun Karimov" <je.kari...@gmail.com> wrote:
>> >Hi Wencong,
>> >
>> >Thanks for the FLIP. +1 for it.
>> >
>> >Providing hints to users will enable more optimization potential for DSv2.
>> >I have a few questions.
>> >
>> >I think currently, DSv2 ExecutionEnvironment does not support getting
>> >execution plan (getExecutionPlan()).
>> >Do you plan to integrate these annotations with the execution plan?
>> >
>> >Any plans to check for misuse of attributes? Or any plans for a framework
>> >to implicitly include attributes?
>> >
>> >Also, now that we make analogy with SQL hints, SQL query planners usually
>> >ignore wrong hints and continue with its best plan.
>> >Do we want to consider this approach? Or should we throw exception
>> whenever
>> >the hint (attribute in this case) is wrong?
>> >
>> >
>> >Regards,
>> >Jeyhun
>> >
>> >
>> >On Thu, Jun 27, 2024 at 7:47 AM Xintong Song <tonysong...@gmail.com>
>> wrote:
>> >
>> >> +1 for this FLIP.
>> >>
>> >> I think this is similar to SQL hints, where users can provide optional
>> >> information to help the engine execute the workload more efficiently.
>> >> Having a unified mechanism for such kind of hints should improve
>> usability
>> >> compared to introducing tons of configuration knobs.
>> >>
>> >> Best,
>> >>
>> >> Xintong
>> >>
>> >>
>> >>
>> >> On Wed, Jun 26, 2024 at 8:09 PM Wencong Liu <liuwencle...@163.com>
>> wrote:
>> >>
>> >> > Hi devs,
>> >> >
>> >> >
>> >> > I'm proposing a new FLIP[1] to introduce the ProcessFunction
>> Attribute in
>> >> > the
>> >> > DataStream API V2. The goal is to optimize job execution by leveraging
>> >> > additional information about users' ProcessFunction logic. The
>> proposal
>> >> > includes
>> >> > the following scenarios where the ProcessFunction Attribute can
>> >> > significantly
>> >> > enhance optimization:
>> >> >
>> >> >
>> >> > Scenario 1: If the framework recognizes that the ProcessFunction
>> outputs
>> >> > data
>> >> > only after all input is received, the downstream operators can be
>> >> > scheduled until
>> >> > the ProcessFunction is finished, which effectively reduces resource
>> >> > consumption.
>> >> > Ignoring this information could lead to premature scheduling of
>> >> downstream
>> >> > operators with no data to process. This scenario is addressed and
>> >> > optimized by FLIP-331[2].
>> >> >
>> >> >
>> >> > Scenario 2: For stream processing, where users are only interested in
>> the
>> >> > latest
>> >> > result per key at the current time, the framework can optimize by
>> >> > adjusting the
>> >> > frequency of ProcessFunction outputs. This reduces shuffle data volume
>> >> and
>> >> > downstream operator workload. If this optimization is ignored, each
>> new
>> >> > input
>> >> > would trigger a new output. This scenario is addressed and
>> >> > optimized by FLIP-365[3].
>> >> >
>> >> >
>> >> > Scenario 3: If a user's ProcessFunction neither caches inputs nor
>> >> outputs,
>> >> > recognizing this can enable object reuse for this data within the
>> >> > OperatorChain,
>> >> > enhancing performance. Without this optimization, data would be copied
>> >> > before
>> >> > being passed to the next operator. This scenario is addressed and
>> >> > optimized by FLIP-329[4].
>> >> >
>> >> >
>> >> > To unify the mechanism for utilizing additional information and
>> >> optimizing
>> >> > jobs,
>> >> > we propose introducing the ProcessFunction Attribute represented by
>> >> > Java annotations, which allow users to provide relevant information
>> about
>> >> > their
>> >> > ProcessFunctions. The framework can then use this to optimize job
>> >> > execution.
>> >> > Importantly, regular job execution remains unaffected whether users
>> use
>> >> > this
>> >> > attribute or not.
>> >> >
>> >> >
>> >> > Looking forward to discussing this in the upcoming FLIP.
>> >> >
>> >> >
>> >> > Best regards,
>> >> > Wencong Liu
>> >> >
>> >> >
>> >> > [1]
>> >> >
>> >>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-466%3A+Introduce+ProcessFunction+Attribute+in+DataStream+API+V2
>> >> > [2]
>> >> >
>> >>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-331%3A+Support+EndOfStreamTrigger+and+isOutputOnlyAfterEndOfStream+operator+attribute+to+optimize+task+deployment
>> >> > [3]
>> >> >
>> >>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-365%3A+Introduce+flush+interval+to+adjust+the+interval+of+emitting+results+with+idempotent+semantics
>> >> > [4]
>> >> >
>> >>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-329%3A+Add+operator+attribute+to+specify+support+for+object-reuse
>> >>
>>

Reply via email to