Hi devs,
I'm proposing a new FLIP[1] to introduce the ProcessFunction Attribute in the DataStream API V2. The goal is to optimize job execution by leveraging additional information about users' ProcessFunction logic. The proposal includes the following scenarios where the ProcessFunction Attribute can significantly enhance optimization: Scenario 1: If the framework recognizes that the ProcessFunction outputs data only after all input is received, the downstream operators can be scheduled until the ProcessFunction is finished, which effectively reduces resource consumption. Ignoring this information could lead to premature scheduling of downstream operators with no data to process. This scenario is addressed and optimized by FLIP-331[2]. Scenario 2: For stream processing, where users are only interested in the latest result per key at the current time, the framework can optimize by adjusting the frequency of ProcessFunction outputs. This reduces shuffle data volume and downstream operator workload. If this optimization is ignored, each new input would trigger a new output. This scenario is addressed and optimized by FLIP-365[3]. Scenario 3: If a user's ProcessFunction neither caches inputs nor outputs, recognizing this can enable object reuse for this data within the OperatorChain, enhancing performance. Without this optimization, data would be copied before being passed to the next operator. This scenario is addressed and optimized by FLIP-329[4]. To unify the mechanism for utilizing additional information and optimizing jobs, we propose introducing the ProcessFunction Attribute represented by Java annotations, which allow users to provide relevant information about their ProcessFunctions. The framework can then use this to optimize job execution. Importantly, regular job execution remains unaffected whether users use this attribute or not. Looking forward to discussing this in the upcoming FLIP. Best regards, Wencong Liu [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-466%3A+Introduce+ProcessFunction+Attribute+in+DataStream+API+V2 [2] https://cwiki.apache.org/confluence/display/FLINK/FLIP-331%3A+Support+EndOfStreamTrigger+and+isOutputOnlyAfterEndOfStream+operator+attribute+to+optimize+task+deployment [3] https://cwiki.apache.org/confluence/display/FLINK/FLIP-365%3A+Introduce+flush+interval+to+adjust+the+interval+of+emitting+results+with+idempotent+semantics [4] https://cwiki.apache.org/confluence/display/FLINK/FLIP-329%3A+Add+operator+attribute+to+specify+support+for+object-reuse