Thanks Best,
Xintong On Wed, Jan 10, 2024 at 5:56 PM Xuannan Su <suxuanna...@gmail.com> wrote: > Hi all, > > After several rounds of offline discussions with Xingtong and Jinhao, > we have decided to narrow the scope of the FLIP. It will now focus on > introducing OperatorAttributes that indicate whether an operator emits > records only after inputs have ended. We will also use the attribute > to optimize task scheduling for better resource utilization. Setting > the backlog status and optimizing the operator implementation during > the backlog will be deferred to future work. > > In addition to the change above, we also make the following changes to > the FLIP to address the problems mentioned by Dong: > - Public interfaces are updated to reuse the GlobalWindows. > - Instead of making all outputs of the upstream operators of the > "isOutputOnlyAfterEndOfStream=true" operator blocking, we only make > the output of the operator with "isOutputOnlyAfterEndOfStream=true" > blocking. This can prevent the second problem Dong mentioned. In the > future, we may introduce an extra OperatorAttributes to indicate if an > operator has any side output. > > I would greatly appreciate any comment or feedback you may have on the > updated FLIP. > > Best regards, > Xuannan > > On Tue, Sep 26, 2023 at 11:24 AM Dong Lin <lindon...@gmail.com> wrote: > > > > Hi all, > > > > Thanks for the review! > > > > Becket and I discussed this FLIP offline and we agreed on several things > > that need to be improved with this FLIP. I will summarize our discussion > > with the problems and TODOs. We will update the FLIP and let you know > once > > the FLIP is ready for review again. > > > > 1) Investigate whether it is possible to update the existing > GlobalWindows > > in a backward-compatible way and re-use it for the same purpose > > as EndOfStreamWindows, without introducing EndOfStreamWindows as a new > > class. > > > > Note that GlobalWindows#getDefaultTrigger returns a NeverTrigger instance > > which will not trigger window's computation even on end-of-inputs. We > will > > need to investigate its existing usage and see if we can re-use it in a > > backward-compatible way. > > > > 2) Let JM know whether any operator in the upstream of the operator with > > "isOutputOnEOF=true" will emit output via any side channel. The FLIP > should > > update the execution mode of those operators *only if* all outputs from > > those operators are emitted only at the end of input. > > > > More specifically, the upstream operator might involve a user-defined > > operator that might emit output directly to an external service, where > the > > emission operation is not explicitly expressed as an operator's output > edge > > and thus not visible to JM. Similarly, it is also possible for the > > user-defined operator to register a timer > > via InternalTimerService#registerEventTimeTimer and emit output to an > > external service inside Triggerable#onEventTime. There is a chance that > > users still need related logic to output data in real-time, even if the > > downstream operators have isOutputOnEOF=true. > > > > One possible solution to address this problem is to add an extra > > OperatorAttribute to specify whether this operator might output records > in > > such a way that does not go through operator's output (e.g. side output). > > Then the JM can safely enable the runtime optimization currently > described > > in the FLIP when there is no such operator. > > > > 3) Create a follow-up FLIP that allows users to specify whether a source > > with Boundedness=bounded should have isProcessingBacklog=true. > > > > This capability would effectively introduce a 3rd strategy to set backlog > > status (in addition to FLIP-309 and FLIP-328). It might be useful to note > > that, even though the data in bounded sources are backlog data in most > > practical use-cases, it is not necessarily true. For example, users might > > want to start a Flink job to consume real-time data from a Kafka topic > and > > specify that the job stops after 24 hours, which means the source is > > technically bounded while the data is fresh/real-time. > > > > This capability is more generic and can cover more use-case than > > EndOfStreamWindows. On the other hand, EndOfStreamWindows will still be > > useful in cases where users already need to specify this window assigner > in > > a DataStream program, without bothering users to decide whether it is > safe > > to treat data in a bounded source as backlog data. > > > > > > Regards, > > Dong > > > > > > > > > > > > > > On Mon, Sep 18, 2023 at 2:56 PM Yuxin Tan <tanyuxinw...@gmail.com> > wrote: > > > > > Hi, Dong, > > > Thanks for your efforts. > > > > > > +1 to this proposal, > > > I believe this will improve the performance in some mixture > circumstances > > > of bounded and unbounded workloads. > > > > > > Best, > > > Yuxin > > > > > > > > > Xintong Song <tonysong...@gmail.com> 于2023年9月18日周一 10:56写道: > > > > > > > Thanks for addressing my comments, Dong. > > > > > > > > LGTM. > > > > > > > > Best, > > > > > > > > Xintong > > > > > > > > > > > > > > > > On Sat, Sep 16, 2023 at 3:34 PM Wencong Liu <liuwencle...@163.com> > > > wrote: > > > > > > > > > Hi Dong & Jinhao, > > > > > > > > > > Thanks for your clarification! +1 > > > > > > > > > > Best regards, > > > > > Wencong > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > At 2023-09-15 11:26:16, "Dong Lin" <lindon...@gmail.com> wrote: > > > > > >Hi Wencong, > > > > > > > > > > > >Thanks for your comments! Please see my reply inline. > > > > > > > > > > > >On Thu, Sep 14, 2023 at 12:30 PM Wencong Liu < > liuwencle...@163.com> > > > > > wrote: > > > > > > > > > > > >> Dear Dong, > > > > > >> > > > > > >> I have thoroughly reviewed the proposal for FLIP-331 and > believe it > > > > > would > > > > > >> be > > > > > >> a valuable addition to Flink. However, I do have a few questions > > > that > > > > I > > > > > >> would > > > > > >> like to discuss: > > > > > >> > > > > > >> > > > > > >> 1. The FLIP-331 proposed the EndOfStreamWindows that is > implemented > > > by > > > > > >> TimeWindow with maxTimestamp = (Long.MAX_VALUE - 1), which > naturally > > > > > >> supports WindowedStream and AllWindowedStream to process all > records > > > > > >> belonging to a key in a 'global' window under both STREAMING and > > > BATCH > > > > > >> runtime execution mode. > > > > > >> > > > > > >> > > > > > >> However, besides coGroup and keyBy().aggregate(), other > operators on > > > > > >> WindowedStream and AllWindowedStream, such as join/reduce, etc, > > > > > currently > > > > > >> are still implemented based on WindowOperator. > > > > > >> > > > > > >> > > > > > >> In fact, these operators can also be implemented without using > > > > > >> WindowOperator > > > > > >> to prevent additional WindowAssigner#assignWindows or > > > > > >> triggerContext#onElement > > > > > >> invocation cost. Will there be plans to support these operators > in > > > the > > > > > >> future? > > > > > >> > > > > > > > > > > > >You are right. The EndOfStreamWindows proposed in this FLIP can > > > > > potentially > > > > > >benefit any DataStream API that takes WindowAssigner as > parameters. > > > This > > > > > >can involve more operations than aggregate and co-group. > > > > > > > > > > > >And yes, we have plans to take advantage of this API to optimize > these > > > > > >operators in the future. This FLIP focuses on the introduction of > the > > > > > >public APIs and uses aggregate/co-group as the first two examples > to > > > > > >show-case the performance benefits. > > > > > > > > > > > >I have added a "Analysis of APIs affected by this FLIP" to list > the > > > > > >DataStream APIs that can benefit from this FLIP. Would this answer > > > your > > > > > >question? > > > > > > > > > > > > > > > > > >> > > > > > >> 2. When using EndOfStreamWindows, upstream operators no longer > > > support > > > > > >> checkpointing. This limit may be too strict, especially when > dealing > > > > > with > > > > > >> bounded data in streaming runtime execution mode, where > > > checkpointing > > > > > >> can still be useful. > > > > > >> > > > > > > > > > > > >I am not sure we have a good way to support checkpoint while still > > > > > >achieving the performance improves targeted by this FLIP. > > > > > > > > > > > >The issue here is that if we support checkpoint, then we can not > take > > > > > >advantage of algorithms (e.g. sorting inputs using ExternalSorter) > > > that > > > > > are > > > > > >not compatible with checkpoints. These algorithms (which do not > > > support > > > > > >checkpoint) are the main reasons why batch mode currently > > > significantly > > > > > >outperforms stream mode in doing aggregation/cogroup etc. > > > > > > > > > > > >In most cases where the user does not care about processing > latency, > > > it > > > > is > > > > > >generally preferred to use batch mode to perform aggregation > > > operations > > > > > >(which should be 10X faster than the existing stream mode > performance) > > > > > >instead of doing checkpoint. > > > > > > > > > > > >Also note that we can still let operators perform failover in the > same > > > > as > > > > > >the existing batch mode execution, where the intermediate results > > > > > (produced > > > > > >by one operator) can be persisted in shuffle service and > downstream > > > > > >operators can re-read those data from shuffle service after > failover. > > > > > > > > > > > > > > > > > >> > > > > > >> 3. The proposal mentions that if a transformation has > isOutputOnEOF > > > == > > > > > >> true, the > > > > > >> operator as well as its upstream operators will be executed in > > > 'batch > > > > > >> mode' with > > > > > >> checkpointing disabled. I would like to understand the specific > > > > > >> implications of this > > > > > >> 'batch mode' and if there are any other changes associated with > it? > > > > > > > > > > > > > > > > > >Good point. We should explicitly mention the changes. I have > updated > > > the > > > > > >FLIP to clarify this. > > > > > > > > > > > >More specifically, the checkpoint is disabled when these > operators are > > > > > >running, such that these operators can do operations not > compatible > > > with > > > > > >checkpoints (e.g. sorting inputs). And operators should re-read > the > > > data > > > > > >from the upstream blocking edge or sources after failover. > > > > > > > > > > > >Would this answer your question? > > > > > > > > > > > > > > > > > >> > > > > > >> Additionally, I am curious to know if this 'batch mode' > conflicts > > > with > > > > > the > > > > > >> 'mix mode' > > > > > >> > > > > > >> described in FLIP-327. While the coGroup and keyBy().aggregate() > > > > > operators > > > > > >> on > > > > > >> EndOfStreamWindows have the attribute > 'isInternalSorterSupported' > > > set > > > > to > > > > > >> true, > > > > > >> indicating support for the 'mixed mode', they also have > > > isOutputOnEOF > > > > > set > > > > > >> to true, > > > > > >> which suggests that the upstream operators should be executed in > > > > 'batch > > > > > >> mode'. > > > > > >> Will the 'mixed mode' be ignored when in 'batch mode'? I would > > > > > appreciate > > > > > >> any > > > > > >> clarification on this matter. > > > > > >> > > > > > > > > > > > >Good question. I think `isInternalSorterSupported` and > `isOutputOnEOF` > > > > do > > > > > >not conflict with each other. > > > > > > > > > > > >It might be useful to recap the semantics of these attributes: > > > > > >- `isOutputOnEOF` describes whether an operator outputs data only > > > after > > > > > all > > > > > >its input has been ingested by the operator. > > > > > >- `isInternalSorterSupported` describes whether an operator will > use > > > an > > > > > >internal sorter when it does not need to do checkpoints. > > > > > > > > > > > >And we can further derive that these semantics of two attributes > do > > > not > > > > > >conflict with each other. And we can have valid operators with > any of > > > > the > > > > > >four combinations of true/false values for these two attributes. > > > > > > > > > > > >In the specific example you described above, let's say > isOutputOnEOF = > > > > > true > > > > > >and isInternalSorterSupported = true. According to FLIP-331, the > > > > > checkpoint > > > > > >is disabled when this operator is running. And according to > FLIP-327, > > > > this > > > > > >operator will sort data internally, which means that Flink runtime > > > > should > > > > > >not additionally sort its inputs. So overall the Flink job can > comply > > > > with > > > > > >the semantics of these two attributes consistently. > > > > > > > > > > > > > > > > > >Thanks again for taking time to review this FLIP. Please let me > know > > > > what > > > > > >you think. > > > > > > > > > > > >Best regards, > > > > > >Dong > > > > > > > > > > > > > > > > > >> Thank you for taking the time to consider my feedback. I eagerly > > > await > > > > > >> your response. > > > > > >> > > > > > >> Best regards, > > > > > >> > > > > > >> Wencong Liu > > > > > >> > > > > > >> > > > > > >> > > > > > >> > > > > > >> > > > > > >> > > > > > >> > > > > > >> > > > > > >> > > > > > >> > > > > > >> > > > > > >> At 2023-09-01 11:21:47, "Dong Lin" <lindon...@gmail.com> wrote: > > > > > >> >Hi all, > > > > > >> > > > > > > >> >Jinhao (cc'ed) and I are opening this thread to discuss > FLIP-331: > > > > > Support > > > > > >> >EndOfStreamWindows and isOutputOnEOF operator attribute to > optimize > > > > > task > > > > > >> >deployment. The design doc can be found at > > > > > >> > > > > > > >> > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-331%3A+Support+EndOfStreamWindows++and+isOutputOnEOF+operator+attribute+to+optimize+task+deployment > > > > > >> >. > > > > > >> > > > > > > >> >This FLIP introduces isOutputOnEOF operator attribute that > > > JobManager > > > > > can > > > > > >> >use to optimize task deployment and resource utilization. In > > > > addition, > > > > > it > > > > > >> >also adds EndOfStreamWindows that can be used with the > DataStream > > > > APIs > > > > > >> >(e.g. cogroup, aggregate) to significantly increase throughput > and > > > > > reduce > > > > > >> >resource utilization. > > > > > >> > > > > > > >> >We would greatly appreciate any comment or feedback you may > have on > > > > > this > > > > > >> >proposal. > > > > > >> > > > > > > >> >Cheers, > > > > > >> >Dong > > > > > >> > > > > > > > > > > > > >