Hi Jark, Do you have time to comment on whether the current design looks good?
I plan to start voting in 3 days if there is no follow-up comment. Thanks, Dong On Fri, Sep 15, 2023 at 2:01 PM Jark Wu <imj...@gmail.com> wrote: > Hi Dong, > > > Note that we can not simply enforce the semantics of "any invocation of > > setIsProcessingBacklog(false) will set the job's backlog status to > false". > > Suppose we have a job with two operators, where operatorA invokes > > setIsProcessingBacklog(false) and operatorB invokes > > setIsProcessingBacklog(true). There will be conflict if we use the > > semantics of "any invocation of setIsProcessingBacklog(false) will set > the > > job's backlog status to false". > > So it should set job's backlog status to false if the job has only a single > source, > right? > > Could you elaborate on the behavior if there is a job with a single source, > and the watermark lag exceeds the configured value (should set backlog to > true?), > but the source invokes "setIsProcessingBacklog(false)"? Or the inverse one, > the source invokes "setIsProcessingBacklog(false)" first, but the watermark > lag > exceeds the configured value. > > This is the conflict I'm concerned about. > > Best, > Jark > > On Fri, 15 Sept 2023 at 12:00, Dong Lin <lindon...@gmail.com> wrote: > > > Hi Jark, > > > > Please see my comments inline. > > > > On Fri, Sep 15, 2023 at 10:35 AM Jark Wu <imj...@gmail.com> wrote: > > > > > Hi Dong, > > > > > > Please see my comments inline below. > > > > > > > > Hmm.. can you explain what you mean by "different watermark delay > > > > definitions for each source"? > > > > > > For example, "table1" defines a watermark with delay 5 seconds, > > > "table2" defines a watermark with delay 10 seconds. They have different > > > watermark delay definitions. So it is also reasonable they have > different > > > watermark lag definitions, e.g., "table1" allows "10mins" and "table2" > > > allows "20mins". > > > > > > > I think the watermark delay you mentioned above is conceptually / > > fundamentally different from the watermark-lag-threshold proposed in this > > FLIP. > > > > It might be useful to revisit the semantics of these two concepts: > > - watermark delay is used to account for the maximum amount of > orderliness > > that users expect (or willing to wait for) for records from a given > source. > > - watermark-lag-threshold is used to define when processing latency is no > > longer important (e.g. because data is already stale). > > > > Even though users might expect different out of orderliness for different > > sources, users do not necessarily have different definitions / thresholds > > for when a record is considered "already stale". > > > > > > > > > > > I think there is probably misunderstanding here. FLIP-309 does NOT > > > directly > > > > specify when backlog is false. It is intentionally specified in such > a > > > way > > > > that there will not be any conflict between these rules. > > > > > > Do you mean FLIP-309 doesn't allow to specify backlog to be false? > > > Is this mentioned in FLIP-309? This is completely different from what I > > > > > > > Can you explain what you mean by "allow to specify backlog to be false"? > > > > If what you mean is that "can invoke setIsProcessingBacklog(false)", then > > FLIP-309 supports doing this. > > > > If what you mean is that "any invocation of setIsProcessingBacklog(false) > > will set the job's backlog status to false", then FLIP-309 does not > support > > this. I believe the existing Java doc of this API and FLIP-309 is > > compatible with this explanation. > > > > Note that we can not simply enforce the semantics of "any invocation of > > setIsProcessingBacklog(false) will set the job's backlog status to > false". > > Suppose we have a job with two operators, where operatorA invokes > > setIsProcessingBacklog(false) and operatorB invokes > > setIsProcessingBacklog(true). There will be conflict if we use the > > semantics of "any invocation of setIsProcessingBacklog(false) will set > the > > job's backlog status to false". > > > > Would this answer your question? > > > > Best, > > Dong > > > > > > > understand. From the API interface > "ctx.setIsProcessingBacklog(boolean)", > > > it allows users to invoke "setIsProcessingBacklog(false)". And FLIP-309 > > > also says "MySQL CDC source should report isProcessingBacklog=false > > > at the beginning of the changelog stage." If not, maybe we need to > > revisit > > > FLIP-309. > > > > > > > Best, > > > Jark > > > > > > > > > > > > On Fri, 15 Sept 2023 at 08:41, Dong Lin <lindon...@gmail.com> wrote: > > > > > > > Hi Jark, > > > > > > > > Do you have any follow-up comment? > > > > > > > > My gut feeling is that suppose we need to support per-source > watermark > > > lag > > > > specification in the future (not sure we have a use-case for this > right > > > > now), we can add such a config in the future with a follow-up FLIP. > The > > > > job-level config will still be useful as it makes users' > configuration > > > > simpler for common scenarios. > > > > > > > > If it is OK, can we agree to make incremental progress for Flink and > > > start > > > > a voting thread for this FLIP? > > > > > > > > Thanks, > > > > Dong > > > > > > > > > > > > On Mon, Sep 11, 2023 at 4:41 PM Jark Wu <imj...@gmail.com> wrote: > > > > > > > > > Hi Dong, > > > > > > > > > > Please see my comments inline. > > > > > > > > > > > As a result, the proposed job-level > > > > > > config will be applied only in the changelog stage. So there is > no > > > > > > difference between these two approaches in this particular case, > > > right? > > > > > > > > > > How the job-level config can be applied ONLY in the changelog > stage? > > > > > I think it is only possible if it is implemented by the CDC source > > > > itself, > > > > > because the framework doesn't know which stage of the source is. > > > > > Know that the CDC source may emit watermarks with a very small lag > > > > > in the snapshot stage, and the job-level config may turn the > backlog > > > > > status into false. > > > > > > > > > > > On the other hand, per-source config will be necessary if users > > want > > > to > > > > > > apply different watermark lag thresholds for different sources in > > the > > > > > same > > > > > > job. > > > > > > > > > > We also have different watermark delay definitions for each source, > > > > > I think this's also reasonable and necessary to have different > > > watermark > > > > > lags. > > > > > > > > > > > > > > > > Each source can have its own rule that specifies when the backlog > > can > > > > be > > > > > true > > > > > > (e.g. MySql CDC says the backlog should be true during the > snapshot > > > > > stage). > > > > > > And we can have a job-level config that specifies when the > backlog > > > > should > > > > > > be true. Note that it is designed in such a way that none of > these > > > > rules > > > > > > specify when the backlog should be false. That is why there is no > > > > > conflict > > > > > > by definition. > > > > > > > > > > IIUC, FLIP-309 provides `setIsProcessingBacklog` to specify when > the > > > > > backlog > > > > > is true and when is FALSE. This conflicts with the job-level config > > as > > > it > > > > > will turn > > > > > the status into true. > > > > > > > > > > > If I understand your comments correctly, you mean that we might > > have > > > a > > > > > > Flink SQL DDL with user-defined watermark expressions. And users > > also > > > > > want > > > > > > to set the backlog to true if the watermark generated by that > > > > > > user-specified expression exceeds a threshold. > > > > > > > > > > No. I mean the source may not support generating watermarks, so the > > > > > watermark > > > > > expression is applied in a following operator (instead of in the > > source > > > > > operator). > > > > > This will result in the watermark lag doesn't work in this case and > > > > confuse > > > > > users. > > > > > > > > > > > You are right that this is a limitation. However, this is only a > > > > > short-term > > > > > > limitation which we added to make sure that we can focus on the > > > > > capability > > > > > > to switch from backlog=true to backlog=false. In the future, we > > will > > > > > remove > > > > > > this limitation and also support switching from backlog=false to > > > > > > backlog=true. > > > > > > > > > > I can understand it may be difficult to support runtime mode > > switching > > > > back > > > > > and forth. > > > > > However, I think this should be a limitation of FLIP-327, not > > FLIP-328. > > > > > IIUC, > > > > > FLIP-309 doesn't have this limitation, right? I just don't > understand > > > > > what's the > > > > > challenge to switch a flag? > > > > > > > > > > Best, > > > > > Jark > > > > > > > > > > > > > > > On Sun, 10 Sept 2023 at 19:44, Dong Lin <lindon...@gmail.com> > wrote: > > > > > > > > > > > Hi Jark, > > > > > > > > > > > > Thanks for the comments. Please see my comments inline. > > > > > > > > > > > > On Sat, Sep 9, 2023 at 4:13 PM Jark Wu <imj...@gmail.com> wrote: > > > > > > > > > > > > > Hi Xuannan, > > > > > > > > > > > > > > I leave my comments inline. > > > > > > > > > > > > > > > In the case where a user wants to > > > > > > > > use a CDC source and also determine backlog status based on > > > > watermark > > > > > > > > lag, we still need to define the rule when that occurs > > > > > > > > > > > > > > This rule should be defined by the source itself (who knows > > backlog > > > > > > best), > > > > > > > not by the framework. In the case of CDC source, it reports > > > > > > isBacklog=true > > > > > > > during snapshot stage, and report isBacklog=false during > > changelog > > > > > stage > > > > > > if > > > > > > > watermark-lag is within the threshold. > > > > > > > > > > > > > > > > > > > I am not sure I fully understand the difference between adding a > > > > > job-level > > > > > > config vs. adding a per-source config. > > > > > > > > > > > > In the case of CDC, its watermark lag should be either > unde-defined > > > or > > > > > > really large in the snapshot stage. As a result, the proposed > > > job-level > > > > > > config will be applied only in the changelog stage. So there is > no > > > > > > difference between these two approaches in this particular case, > > > right? > > > > > > > > > > > > There are two advantages of the job-level config over per-source > > > > config: > > > > > > > > > > > > 1) Configuration is simpler. For example, suppose a user has a > > Flink > > > > job > > > > > > that consumes records from multiple Kafka sources and wants to > > > > determine > > > > > > backlog status for these Kafka sources using the same watermark > lag > > > > > > threshold, there is no need for users to repeatedly specify this > > > > > threshold > > > > > > for each source. > > > > > > > > > > > > 2) There is a smaller number of public APIs overall. In > particular, > > > > > instead > > > > > > of repeatedly adding a > setProcessingBacklogWatermarkLagThreshold() > > > API > > > > > for > > > > > > every source operator that has even-time watermark lag defined, > we > > > only > > > > > > need to add one job-level config. Less public API means better > > > > simplicity > > > > > > and maintainability in general. > > > > > > > > > > > > On the other hand, per-source config will be necessary if users > > want > > > to > > > > > > apply different watermark lag thresholds for different sources in > > the > > > > > same > > > > > > job. Personally, I find this a bit counter-intuitive for users to > > > > specify > > > > > > different watermark lag thresholds in the same job. > > > > > > > > > > > > Do you think there is any real-word use-case that requires this? > > > Could > > > > > you > > > > > > provide a specific use-case where per-source config can provide > an > > > > > > advantage over the job-level config? > > > > > > > > > > > > > > > > > > > I think it's not intuitive to combine it with the logical OR > > > > operation. > > > > > > > Even for the > > > > > > > combination logic of backlog status from different channels, > > > FLIP-309 > > > > > > said > > > > > > > it is > > > > > > > "up to the operator to determine its output records' isBacklog > > > value" > > > > > and > > > > > > > proposed > > > > > > > 3 different strategies. Therefore, I think backlog status from > a > > > > single > > > > > > > source should > > > > > > > be up to the source. > > > > > > > > > > > > > > > > > > For both the job-level config and the per-source config, it is > > > > eventually > > > > > > up to the user to decide the computation logic of the backlog > > status. > > > > > > Whether this mechanism is implemented at the per-source level or > > > > > framework > > > > > > level is probably more like an implementation detail. > > > > > > > > > > > > Eventually, I think the choice between these two approaches > depends > > > on > > > > > > whether we have any use-case for users to specify different > > watermark > > > > lag > > > > > > thresholds in the same job. > > > > > > > > > > > > > > > > > > > > > > > > > > IMO, a better API design is not how to resolve conflicts but > not > > > > > > > introducing conflicts. > > > > > > > > > > > > > > > > > > Just to clarify, the current FLIP does not introduce any > conflict. > > > Each > > > > > > source can have its own rule that specifies when the backlog can > be > > > > true > > > > > > (e.g. MySql CDC says the backlog should be true during the > snapshot > > > > > stage). > > > > > > And we can have a job-level config that specifies when the > backlog > > > > should > > > > > > be true. Note that it is designed in such a way that none of > these > > > > rules > > > > > > specify when the backlog should be false. That is why there is no > > > > > conflict > > > > > > by definition. > > > > > > > > > > > > > > > > > > > > > > > > Let the source determine backlog status removes the conflicts > and I > > > > don't > > > > > > > see big > > > > > > > disadvantages. > > > > > > > > > > > > > > > It should not confuse the user that > > > > > > > > DataStream#assignTimestampsAndWatermarks doesn't work with > > > > > > > > backlog.watermark-lag-threshold, as it is not a source. > > > > > > > > > > > > > > Hmm, so this configuration may confuse Flink SQL users, because > > all > > > > > > > watermarks > > > > > > > are defined on the source DDL, but it may use a separate > operator > > > to > > > > > emit > > > > > > > watermarks > > > > > > > if the source doesn't support emitting watermarks. > > > > > > > > > > > > > > > > > > > If I understand your comments correctly, you mean that we might > > have > > > a > > > > > > Flink SQL DDL with user-defined watermark expressions. And users > > also > > > > > want > > > > > > to set the backlog to true if the watermark generated by that > > > > > > user-specified expression exceeds a threshold. > > > > > > > > > > > > That is a good point and use-case. I agree we should also cover > > this > > > > > > scenario. And we can update FLIP-328 to mention that the > job-level > > > > config > > > > > > will also be applicable when the watermark derived from the Flink > > SQL > > > > DDL > > > > > > exceeds this threshold. Would this address your concern? > > > > > > > > > > > > > > > > > > > > > > > > > > > I think the description in the FLIP actually means the other > > way > > > > > > > > around, where the job can never switch back to batch mode > once > > it > > > > has > > > > > > > > switched into streaming mode. This is to align with the > current > > > > state > > > > > > > > of FLIP-327[1], where only switching from batch to stream > mode > > is > > > > > > > > supported. > > > > > > > > > > > > > > This sounds like a limitation of FLIP-327 (that execution mode > > > > depends > > > > > on > > > > > > > backlog status). > > > > > > > But the backlog status shouldn't have this limitation, because > it > > > is > > > > > not > > > > > > > only used for execution > > > > > > > switching. > > > > > > > > > > > > > > > > > > > You are right that this is a limitation. However, this is only a > > > > > short-term > > > > > > limitation which we added to make sure that we can focus on the > > > > > capability > > > > > > to switch from backlog=true to backlog=false. In the future, we > > will > > > > > remove > > > > > > this limitation and also support switching from backlog=false to > > > > > > backlog=true. > > > > > > > > > > > > The capability to switch from backlog=true to backlog=false will > > > > > mitigate a > > > > > > lot of problems we are facing now. As it is common for users to > > > start a > > > > > > Flink job to process backlog data followed by real-time data. On > > the > > > > > other > > > > > > hand, switching from backlog=false to backlog=true is useful when > > > there > > > > > is > > > > > > a traffic spike while the Flink job is processing real-time data, > > > which > > > > > is > > > > > > also useful to address but less important than the previous one. > > > > > > > > > > > > Given that both features require considerable changes to the > > > underlying > > > > > > runtime, we think it might be useful and safe to tackle them one > by > > > > one. > > > > > > > > > > > > Thanks again for the comments. Please let us know what you think. > > > > > > > > > > > > Best, > > > > > > Dong > > > > > > > > > > > > > > > > > > > > > > > > > > Best, > > > > > > > Jark > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Fri, 8 Sept 2023 at 19:09, Xuannan Su < > suxuanna...@gmail.com> > > > > > wrote: > > > > > > > > > > > > > > > Hi Jark and Leonard, > > > > > > > > > > > > > > > > Thanks for the comments. Please see my reply below. > > > > > > > > > > > > > > > > @Jark > > > > > > > > > > > > > > > > > I think a better API doesn't compete with itself. > Therefore, > > > I'm > > > > in > > > > > > > > favor of > > > > > > > > > supporting the watermark lag threshold for each source > > without > > > > > > > > introducing > > > > > > > > > any framework API and configuration. > > > > > > > > > > > > > > > > I don't think supporting the watermark lag threshold for each > > > > source > > > > > > > > can avoid the competition problem. In the case where a user > > wants > > > > to > > > > > > > > use a CDC source and also determine backlog status based on > > > > watermark > > > > > > > > lag, we still need to define the rule when that occurs. With > > that > > > > > > > > said, I think it is more intuitive to combine it with the > > logical > > > > OR > > > > > > > > operation, as the strategies (FLIP-309, FLIP-328) only > > determine > > > > when > > > > > > > > the source's backlog status should be True. What do you > think? > > > > > > > > > > > > > > > > > Besides, this can address another concern that the > watermark > > > may > > > > be > > > > > > > > > generated by DataStream#assignTimestampsAnd > > > > > > > > > Watermarks which doesn't > > > > > > > > > work with the backlog.watermark-lag-threshold job config > > > > > > > > > > > > > > > > The description of the configuration explicitly states that > "a > > > > source > > > > > > > > would report isProcessingBacklog=true if its watermark lag > > > exceeds > > > > > the > > > > > > > > configured value". It should not confuse the user that > > > > > > > > DataStream#assignTimestampsAndWatermarks doesn't work with > > > > > > > > backlog.watermark-lag-threshold, as it is not a source. > > > > > > > > > > > > > > > > > Does that mean the job can never back to streaming mode > once > > > > > switches > > > > > > > > into > > > > > > > > > backlog mode? It sounds like not a complete FLIP to me. Is > it > > > > > > possible > > > > > > > to > > > > > > > > > support switching back in this FLIP? > > > > > > > > > > > > > > > > I think the description in the FLIP actually means the other > > way > > > > > > > > around, where the job can never switch back to batch mode > once > > it > > > > has > > > > > > > > switched into streaming mode. This is to align with the > current > > > > state > > > > > > > > of FLIP-327[1], where only switching from batch to stream > mode > > is > > > > > > > > supported. > > > > > > > > > > > > > > > > @Leonard > > > > > > > > > > > > > > > > > > The FLIP describe that: And it should report > > > > > > > isProcessingBacklog=false > > > > > > > > at the beginning of the snapshot stage. > > > > > > > > > This should be “changelog stage” > > > > > > > > > > > > > > > > I think the description is in FLIP-309. Thanks for pointing > > out. > > > I > > > > > > > > updated the description. > > > > > > > > > > > > > > > > > I'm not sure if it's enough to support this feature only in > > > > FLIP-27 > > > > > > > > Source. Although we are pushing the sourceFunction API to be > > > > removed, > > > > > > > these > > > > > > > > APIs will be survive one or two versions in flink repo before > > > they > > > > > are > > > > > > > > actually removed. > > > > > > > > > > > > > > > > I agree that it is good to support the SourceFunction API. > > > However, > > > > > > > > given that the SourceFunction API is marked as deprecated, I > > > think > > > > I > > > > > > > > will prioritize supporting the FLIP-27 Source. We can support > > the > > > > > > > > SourceFunction API after the > > > > > > > > FLIP-27 source. What do you think? > > > > > > > > > > > > > > > > Best regards, > > > > > > > > Xuannan > > > > > > > > > > > > > > > > [1] > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-327%3A+Support+switching+from+batch+to+stream+mode+to+improve+throughput+when+processing+backlog+data > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Fri, Sep 8, 2023 at 1:02 AM Leonard Xu <xbjt...@gmail.com > > > > > > wrote: > > > > > > > > > > > > > > > > > > Thanks Xuannan for driving this FLIP ! > > > > > > > > > > > > > > > > > > The proposal generally looks good to me, but I still left > > some > > > > > > > comments: > > > > > > > > > > > > > > > > > > > One more question about the FLIP is that the FLIP says > > "Note > > > > that > > > > > > > this > > > > > > > > > > config does not support switching source's > > > isProcessingBacklog > > > > > from > > > > > > > > false to true > > > > > > > > > > for now.” Does that mean the job can never back to > > streaming > > > > mode > > > > > > > once > > > > > > > > switches into > > > > > > > > > > backlog mode? It sounds like not a complete FLIP to me. > Is > > it > > > > > > > possible > > > > > > > > to > > > > > > > > > > support switching back in this FLIP? > > > > > > > > > +1 for Jark’s concern, IIUC, the state transition of > > > > > > > IsProcessingBacklog > > > > > > > > depends on whether the data in the source is processing > backlog > > > > data > > > > > or > > > > > > > > not. Different sources will have different backlog status and > > > which > > > > > may > > > > > > > > change over time. From a general perspective, we should not > > have > > > > this > > > > > > > > restriction. > > > > > > > > > > > > > > > > > > > The FLIP describe that: And it should report > > > > > > > isProcessingBacklog=false > > > > > > > > at the beginning of the snapshot stage. > > > > > > > > > This should be “changelog stage” > > > > > > > > > > > > > > > > > > I'm not sure if it's enough to support this feature only in > > > > FLIP-27 > > > > > > > > Source. Although we are pushing the sourceFunction API to be > > > > removed, > > > > > > > these > > > > > > > > APIs will be survive one or two versions in flink repo before > > > they > > > > > are > > > > > > > > actually removed. > > > > > > > > > > > > > > > > > > Best, > > > > > > > > > Leonard > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Best, > > > > > > > > > > Jark > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Thu, 7 Sept 2023 at 13:51, Xuannan Su < > > > > suxuanna...@gmail.com> > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > >> Hi all, > > > > > > > > > >> > > > > > > > > > >> Thank you for all the reviews and suggestions. > > > > > > > > > >> > > > > > > > > > >> I believe all the comments have been addressed. If there > > are > > > > no > > > > > > > > > >> further comments, I plan to open the voting thread for > > this > > > > FLIP > > > > > > > early > > > > > > > > > >> next week. > > > > > > > > > >> > > > > > > > > > >> Best regards, > > > > > > > > > >> Xuannan > > > > > > > > > >> > > > > > > > > > >> On Thu, Sep 7, 2023 at 12:09 AM Jing Ge > > > > > > <j...@ververica.com.invalid > > > > > > > > > > > > > > > > > >> wrote: > > > > > > > > > >>> > > > > > > > > > >>> Hi Xuannan, > > > > > > > > > >>> > > > > > > > > > >>> I thought FLIP-328 will compete with FLIP-309 while > > setting > > > > the > > > > > > > > value of > > > > > > > > > >>> the backlog. Understood. Thanks for the hint. > > > > > > > > > >>> > > > > > > > > > >>> Best regards, > > > > > > > > > >>> Jing > > > > > > > > > >>> > > > > > > > > > >>> On Wed, Sep 6, 2023 at 12:12 PM Xuannan Su < > > > > > > suxuanna...@gmail.com> > > > > > > > > > >> wrote: > > > > > > > > > >>> > > > > > > > > > >>>> Hi Jing, > > > > > > > > > >>>> > > > > > > > > > >>>> Thank you for the clarification. > > > > > > > > > >>>> > > > > > > > > > >>>> For the use case you mentioned, I believe we can > utilize > > > the > > > > > > > > > >>>> HybridSource, as updated in FLIP-309[1], to determine > > the > > > > > > backlog > > > > > > > > > >>>> status. For example, if the user wants to process data > > > > before > > > > > > > time T > > > > > > > > > >>>> in batch mode and after time T in stream mode, they > can > > > set > > > > > the > > > > > > > > first > > > > > > > > > >>>> source of the HybridSource to read up to time T and > the > > > last > > > > > > > source > > > > > > > > of > > > > > > > > > >>>> the HybridSource to read from time T. > > > > > > > > > >>>> > > > > > > > > > >>>> Best, > > > > > > > > > >>>> Xuannan > > > > > > > > > >>>> > > > > > > > > > >>>> [1] > > > > > > > > > >>>> > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-309%3A+Support+using+larger+checkpointing+interval+when+source+is+processing+backlog > > > > > > > > > >>>> > > > > > > > > > >>>> > > > > > > > > > >>>> On Mon, Sep 4, 2023 at 10:36 PM Jing Ge > > > > > > > <j...@ververica.com.invalid > > > > > > > > > > > > > > > > > > >>>> wrote: > > > > > > > > > >>>>> > > > > > > > > > >>>>> Hi Xuannan, > > > > > > > > > >>>>> > > > > > > > > > >>>>> Thanks for the clarification. > > > > > > > > > >>>>> > > > > > > > > > >>>>> 3. Event time and process time are two different > > things. > > > It > > > > > > might > > > > > > > > be > > > > > > > > > >>>> rarely > > > > > > > > > >>>>> used, but conceptually, users can process data in the > > > past > > > > > > > within a > > > > > > > > > >>>>> specific time range in the streaming mode. All data > > > before > > > > > that > > > > > > > > range > > > > > > > > > >>>> will > > > > > > > > > >>>>> be considered as backlog and needed to be processed > in > > > the > > > > > > batch > > > > > > > > > >> mode, > > > > > > > > > >>>>> like, e.g. the Present Perfect Progressive tense used > > in > > > > > > English > > > > > > > > > >>>> language. > > > > > > > > > >>>>> > > > > > > > > > >>>>> Best regards, > > > > > > > > > >>>>> Jing > > > > > > > > > >>>>> > > > > > > > > > >>>>> On Thu, Aug 31, 2023 at 4:45 AM Xuannan Su < > > > > > > > suxuanna...@gmail.com> > > > > > > > > > >>>> wrote: > > > > > > > > > >>>>> > > > > > > > > > >>>>>> Hi Jing, > > > > > > > > > >>>>>> > > > > > > > > > >>>>>> Thanks for the reply. > > > > > > > > > >>>>>> > > > > > > > > > >>>>>> 1. You are absolutely right that the watermark lag > > > > threshold > > > > > > > must > > > > > > > > > >> be > > > > > > > > > >>>>>> carefully set with a thorough understanding of > > watermark > > > > > > > > > >> generation. > > > > > > > > > >>>> It is > > > > > > > > > >>>>>> crucial for users to take into account the > > > > WatermarkStrategy > > > > > > > when > > > > > > > > > >>>> setting > > > > > > > > > >>>>>> the watermark lag threshold. > > > > > > > > > >>>>>> > > > > > > > > > >>>>>> 2. Regarding pure processing-time based stream > > > processing > > > > > > jobs, > > > > > > > > > >>>>>> alternative strategies will be implemented to > > determine > > > > > > whether > > > > > > > > the > > > > > > > > > >>>> job is > > > > > > > > > >>>>>> processing backlog data. I have outlined two > possible > > > > > > strategies > > > > > > > > > >> below: > > > > > > > > > >>>>>> > > > > > > > > > >>>>>> - Based on the source operator's state. For example, > > > when > > > > > > MySQL > > > > > > > > CDC > > > > > > > > > >>>> source > > > > > > > > > >>>>>> is reading snapshot, it can claim isBacklog=true. > > > > > > > > > >>>>>> - Based on metrics. For example, when > > > busyTimeMsPerSecond > > > > > (or > > > > > > > > > >>>>>> backPressuredTimeMsPerSecond) > > > > user_specified_threshold, > > > > > then > > > > > > > > > >>>>>> isBacklog=true. > > > > > > > > > >>>>>> > > > > > > > > > >>>>>> As of the strategies proposed in this FLIP, it rely > on > > > > > > generated > > > > > > > > > >>>>>> watermarks. Therefore, if a user intends for the job > > to > > > > > detect > > > > > > > > > >> backlog > > > > > > > > > >>>>>> status based on watermark, it is necessary to > generate > > > the > > > > > > > > > >> watermark. > > > > > > > > > >>>>>> > > > > > > > > > >>>>>> 3. I'm afraid I'm not fully grasping your question. > > From > > > > my > > > > > > > > > >>>> understanding, > > > > > > > > > >>>>>> it should work in both cases. When event times are > > close > > > > to > > > > > > the > > > > > > > > > >>>> processing > > > > > > > > > >>>>>> time, resulting in watermarks close to the > processing > > > > time, > > > > > > the > > > > > > > > > >> job is > > > > > > > > > >>>> not > > > > > > > > > >>>>>> processing backlog data. On the other hand, when > event > > > > times > > > > > > are > > > > > > > > > >> far > > > > > > > > > >>>> from > > > > > > > > > >>>>>> processing time, causing watermarks to also be > > distant, > > > if > > > > > the > > > > > > > lag > > > > > > > > > >>>>>> surpasses the defined threshold, the job is > considered > > > > > > > processing > > > > > > > > > >>>> backlog > > > > > > > > > >>>>>> data. > > > > > > > > > >>>>>> > > > > > > > > > >>>>>> Best, > > > > > > > > > >>>>>> Xuannan > > > > > > > > > >>>>>> > > > > > > > > > >>>>>> > > > > > > > > > >>>>>>> On Aug 31, 2023, at 02:56, Jing Ge > > > > > > <j...@ververica.com.INVALID > > > > > > > > > > > > > > > > > >>>> wrote: > > > > > > > > > >>>>>>> > > > > > > > > > >>>>>>> Hi Xuannan, > > > > > > > > > >>>>>>> > > > > > > > > > >>>>>>> Thanks for the clarification. That is the part > where > > I > > > am > > > > > > > trying > > > > > > > > > >> to > > > > > > > > > >>>>>>> understand your thoughts. I have some follow-up > > > > questions: > > > > > > > > > >>>>>>> > > > > > > > > > >>>>>>> 1. It depends strongly on the watermarkStrategy and > > how > > > > > > > > > >> customized > > > > > > > > > >>>>>>> watermark generation looks like. It mixes business > > > logic > > > > > with > > > > > > > > > >>>> technical > > > > > > > > > >>>>>>> implementation and technical data processing mode. > > The > > > > > value > > > > > > of > > > > > > > > > >> the > > > > > > > > > >>>>>>> watermark lag threshold must be set very carefully. > > If > > > > the > > > > > > > value > > > > > > > > > >> is > > > > > > > > > >>>> too > > > > > > > > > >>>>>>> small. any time, when the watermark generation > logic > > is > > > > > > > > > >>>> changed(business > > > > > > > > > >>>>>>> logic changes lead to the threshold getting > > exceeded), > > > > the > > > > > > same > > > > > > > > > >> job > > > > > > > > > >>>> might > > > > > > > > > >>>>>>> be running surprisingly in backlog processing mode, > > > i.e. > > > > a > > > > > > > > > >> butterfly > > > > > > > > > >>>>>>> effect. A comprehensive documentation is required > to > > > > avoid > > > > > > any > > > > > > > > > >>>> confusion > > > > > > > > > >>>>>>> for the users. > > > > > > > > > >>>>>>> 2. Like Jark already mentioned, use cases that do > not > > > > have > > > > > > > > > >>>> watermarks, > > > > > > > > > >>>>>>> like pure processing-time based stream > processing[1] > > > are > > > > > not > > > > > > > > > >>>> covered. It > > > > > > > > > >>>>>> is > > > > > > > > > >>>>>>> more or less a trade-off solution that does not > > support > > > > > such > > > > > > > use > > > > > > > > > >>>> cases > > > > > > > > > >>>>>> and > > > > > > > > > >>>>>>> appropriate documentation is required. Forcing them > > to > > > > > > > explicitly > > > > > > > > > >>>>>> generate > > > > > > > > > >>>>>>> watermarks that are never needed just because of > this > > > > does > > > > > > not > > > > > > > > > >> sound > > > > > > > > > >>>>>> like a > > > > > > > > > >>>>>>> proper solution. > > > > > > > > > >>>>>>> 3. If I am not mistaken, it only works for use > cases > > > > where > > > > > > > event > > > > > > > > > >>>> times > > > > > > > > > >>>>>> are > > > > > > > > > >>>>>>> very close to the processing times, because the > wall > > > > clock > > > > > is > > > > > > > > > >> used to > > > > > > > > > >>>>>>> calculate the watermark lag and the watermark is > > > > generated > > > > > > > based > > > > > > > > > >> on > > > > > > > > > >>>> the > > > > > > > > > >>>>>>> event time. > > > > > > > > > >>>>>>> > > > > > > > > > >>>>>>> Best regards, > > > > > > > > > >>>>>>> Jing > > > > > > > > > >>>>>>> > > > > > > > > > >>>>>>> [1] > > > > > > > > > >>>>>>> > > > > > > > > > >>>>>> > > > > > > > > > >>>> > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/apache/flink/blob/2c50b4e956305426f478b726d4de4a640a16b810/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkStrategy.java#L236 > > > > > > > > > >>>>>>> > > > > > > > > > >>>>>>> On Wed, Aug 30, 2023 at 4:06 AM Xuannan Su < > > > > > > > > > >> suxuanna...@gmail.com> > > > > > > > > > >>>>>> wrote: > > > > > > > > > >>>>>>> > > > > > > > > > >>>>>>>> Hi Jing, > > > > > > > > > >>>>>>>> > > > > > > > > > >>>>>>>> Thank you for the suggestion. > > > > > > > > > >>>>>>>> > > > > > > > > > >>>>>>>> The definition of watermark lag is the same as the > > > > > > > watermarkLag > > > > > > > > > >>>> metric > > > > > > > > > >>>>>> in > > > > > > > > > >>>>>>>> FLIP-33[1]. More specifically, the watermark lag > > > > > calculation > > > > > > > is > > > > > > > > > >>>>>> computed at > > > > > > > > > >>>>>>>> the time when a watermark is emitted downstream in > > the > > > > > > > following > > > > > > > > > >>>> way: > > > > > > > > > >>>>>>>> watermarkLag = CurrentTime - Watermark. I have > added > > > > this > > > > > > > > > >>>> description to > > > > > > > > > >>>>>>>> the FLIP. > > > > > > > > > >>>>>>>> > > > > > > > > > >>>>>>>> I hope this addresses your concern. > > > > > > > > > >>>>>>>> > > > > > > > > > >>>>>>>> Best, > > > > > > > > > >>>>>>>> Xuannan > > > > > > > > > >>>>>>>> > > > > > > > > > >>>>>>>> [1] > > > > > > > > > >>>>>>>> > > > > > > > > > >>>>>> > > > > > > > > > >>>> > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics > > > > > > > > > >>>>>>>> > > > > > > > > > >>>>>>>> > > > > > > > > > >>>>>>>>> On Aug 28, 2023, at 01:04, Jing Ge > > > > > > > <j...@ververica.com.INVALID > > > > > > > > > >>> > > > > > > > > > >>>> wrote: > > > > > > > > > >>>>>>>>> > > > > > > > > > >>>>>>>>> Hi Xuannan, > > > > > > > > > >>>>>>>>> > > > > > > > > > >>>>>>>>> Thanks for the proposal. +1 for me. > > > > > > > > > >>>>>>>>> > > > > > > > > > >>>>>>>>> There is one tiny thing that I am not sure if I > > > > > understand > > > > > > it > > > > > > > > > >>>>>> correctly. > > > > > > > > > >>>>>>>>> Since there will be many different > > > WatermarkStrategies > > > > > and > > > > > > > > > >>>> different > > > > > > > > > >>>>>>>>> WatermarkGenerators. Could you please update the > > FLIP > > > > and > > > > > > add > > > > > > > > > >> the > > > > > > > > > >>>>>>>>> description of how the watermark lag is > calculated > > > > > exactly? > > > > > > > > > >> E.g. > > > > > > > > > >>>>>>>> Watermark > > > > > > > > > >>>>>>>>> lag = A - B with A is the timestamp of the > > watermark > > > > > > emitted > > > > > > > > > >> to the > > > > > > > > > >>>>>>>>> downstream and B is....(this is the part I am not > > > > really > > > > > > sure > > > > > > > > > >> after > > > > > > > > > >>>>>>>> reading > > > > > > > > > >>>>>>>>> the FLIP). > > > > > > > > > >>>>>>>>> > > > > > > > > > >>>>>>>>> Best regards, > > > > > > > > > >>>>>>>>> Jing > > > > > > > > > >>>>>>>>> > > > > > > > > > >>>>>>>>> > > > > > > > > > >>>>>>>>> On Mon, Aug 21, 2023 at 9:03 AM Xuannan Su < > > > > > > > > > >> suxuanna...@gmail.com> > > > > > > > > > >>>>>>>> wrote: > > > > > > > > > >>>>>>>>> > > > > > > > > > >>>>>>>>>> Hi Jark, > > > > > > > > > >>>>>>>>>> > > > > > > > > > >>>>>>>>>> Thanks for the comments. > > > > > > > > > >>>>>>>>>> > > > > > > > > > >>>>>>>>>> I agree that the current solution cannot support > > > jobs > > > > > that > > > > > > > > > >> cannot > > > > > > > > > >>>>>> define > > > > > > > > > >>>>>>>>>> watermarks. However, after considering the > > > > > > > > > >> pending-record-based > > > > > > > > > >>>>>>>> solution, I > > > > > > > > > >>>>>>>>>> believe the current solution is superior for the > > > > target > > > > > > use > > > > > > > > > >> case > > > > > > > > > >>>> as it > > > > > > > > > >>>>>>>> is > > > > > > > > > >>>>>>>>>> more intuitive for users. The backlog status > gives > > > > users > > > > > > the > > > > > > > > > >>>> ability > > > > > > > > > >>>>>> to > > > > > > > > > >>>>>>>>>> balance between throughput and latency. Making > > this > > > > > > > trade-off > > > > > > > > > >>>> decision > > > > > > > > > >>>>>>>>>> based on the watermark lag is more intuitive > from > > > the > > > > > > user's > > > > > > > > > >>>>>>>> perspective. > > > > > > > > > >>>>>>>>>> For instance, a user can decide that if the job > > lags > > > > > > behind > > > > > > > > > >> the > > > > > > > > > >>>>>> current > > > > > > > > > >>>>>>>>>> time by more than 1 hour, the result is not > > usable. > > > In > > > > > > that > > > > > > > > > >> case, > > > > > > > > > >>>> we > > > > > > > > > >>>>>> can > > > > > > > > > >>>>>>>>>> optimize for throughput when the data lags > behind > > by > > > > > more > > > > > > > > > >> than an > > > > > > > > > >>>>>> hour. > > > > > > > > > >>>>>>>>>> With the pending-record-based solution, it's > > > > challenging > > > > > > for > > > > > > > > > >>>> users to > > > > > > > > > >>>>>>>>>> determine when to optimize for throughput and > when > > > to > > > > > > > > > >> prioritize > > > > > > > > > >>>>>>>> latency. > > > > > > > > > >>>>>>>>>> > > > > > > > > > >>>>>>>>>> Regarding the limitations of the watermark-based > > > > > solution: > > > > > > > > > >>>>>>>>>> > > > > > > > > > >>>>>>>>>> 1. The current solution can support jobs with > > > sources > > > > > that > > > > > > > > > >> have > > > > > > > > > >>>> event > > > > > > > > > >>>>>>>>>> time. Users can always define a watermark at the > > > > source > > > > > > > > > >> operator, > > > > > > > > > >>>> even > > > > > > > > > >>>>>>>> if > > > > > > > > > >>>>>>>>>> it's not used by downstream operators, such as > > > > streaming > > > > > > > join > > > > > > > > > >> and > > > > > > > > > >>>>>>>> unbounded > > > > > > > > > >>>>>>>>>> aggregate. > > > > > > > > > >>>>>>>>>> > > > > > > > > > >>>>>>>>>> 2.I don't believe it's accurate to say that the > > > > > watermark > > > > > > > lag > > > > > > > > > >> will > > > > > > > > > >>>>>> keep > > > > > > > > > >>>>>>>>>> increasing if no data is generated in Kafka. The > > > > > watermark > > > > > > > > > >> lag and > > > > > > > > > >>>>>>>> backlog > > > > > > > > > >>>>>>>>>> status are determined at the moment when the > > > watermark > > > > > is > > > > > > > > > >> emitted > > > > > > > > > >>>> to > > > > > > > > > >>>>>> the > > > > > > > > > >>>>>>>>>> downstream operator. If no data is emitted from > > the > > > > > > source, > > > > > > > > > >> the > > > > > > > > > >>>>>>>> watermark > > > > > > > > > >>>>>>>>>> lag and backlog status will not be updated. If > the > > > > > > > > > >>>> WatermarkStrategy > > > > > > > > > >>>>>>>> with > > > > > > > > > >>>>>>>>>> idleness is used, the source becomes non-backlog > > > when > > > > it > > > > > > > > > >> becomes > > > > > > > > > >>>> idle. > > > > > > > > > >>>>>>>>>> > > > > > > > > > >>>>>>>>>> 3. I think watermark lag is more intuitive to > > > > determine > > > > > > if a > > > > > > > > > >> job > > > > > > > > > >>>> is > > > > > > > > > >>>>>>>>>> processing backlog data. Even when using pending > > > > > records, > > > > > > it > > > > > > > > > >>>> faces a > > > > > > > > > >>>>>>>>>> similar issue. For example, if the source has 1K > > > > pending > > > > > > > > > >> records, > > > > > > > > > >>>>>> those > > > > > > > > > >>>>>>>>>> records can span from 1 day to 1 hour to 1 > > second. > > > If > > > > > the > > > > > > > > > >> records > > > > > > > > > >>>>>> span > > > > > > > > > >>>>>>>> 1 > > > > > > > > > >>>>>>>>>> day, it's probably best to optimize for > > throughput. > > > If > > > > > > they > > > > > > > > > >> span 1 > > > > > > > > > >>>>>>>> hour, it > > > > > > > > > >>>>>>>>>> depends on the business logic. If they span 1 > > > second, > > > > > > > > > >> optimizing > > > > > > > > > >>>> for > > > > > > > > > >>>>>>>>>> latency is likely the better choice. > > > > > > > > > >>>>>>>>>> > > > > > > > > > >>>>>>>>>> In summary, I believe the watermark-based > solution > > > is > > > > a > > > > > > > > > >> superior > > > > > > > > > >>>>>> choice > > > > > > > > > >>>>>>>>>> for the target use case where watermark/event > time > > > can > > > > > be > > > > > > > > > >> defined. > > > > > > > > > >>>>>>>>>> Additionally, I haven't come across a scenario > > that > > > > > > requires > > > > > > > > > >>>>>> low-latency > > > > > > > > > >>>>>>>>>> processing and reads from a source that cannot > > > define > > > > > > > > > >> watermarks. > > > > > > > > > >>>> If > > > > > > > > > >>>>>> we > > > > > > > > > >>>>>>>>>> encounter such a use case, we can create another > > > FLIP > > > > to > > > > > > > > > >> address > > > > > > > > > >>>> those > > > > > > > > > >>>>>>>>>> needs in the future. What do you think? > > > > > > > > > >>>>>>>>>> > > > > > > > > > >>>>>>>>>> > > > > > > > > > >>>>>>>>>> Best, > > > > > > > > > >>>>>>>>>> Xuannan > > > > > > > > > >>>>>>>>>> > > > > > > > > > >>>>>>>>>> > > > > > > > > > >>>>>>>>>> > > > > > > > > > >>>>>>>>>>> On Aug 20, 2023, at 23:27, Jark Wu < > > > imj...@gmail.com > > > > > > > > > >> <mailto: > > > > > > > > > >>>>>>>>>> imj...@gmail.com>> wrote: > > > > > > > > > >>>>>>>>>>> > > > > > > > > > >>>>>>>>>>> Hi Xuannan, > > > > > > > > > >>>>>>>>>>> > > > > > > > > > >>>>>>>>>>> Thanks for opening this discussion. > > > > > > > > > >>>>>>>>>>> > > > > > > > > > >>>>>>>>>>> This current proposal may work in the mentioned > > > > > watermark > > > > > > > > > >> cases. > > > > > > > > > >>>>>>>>>>> However, it seems this is not a general > solution > > > for > > > > > > > sources > > > > > > > > > >> to > > > > > > > > > >>>>>>>> determine > > > > > > > > > >>>>>>>>>>> "isProcessingBacklog". > > > > > > > > > >>>>>>>>>>> From my point of view, there are 3 limitations > of > > > the > > > > > > > current > > > > > > > > > >>>>>> proposal: > > > > > > > > > >>>>>>>>>>> 1. It doesn't cover jobs that don't have > > > > > > > watermark/event-time > > > > > > > > > >>>>>> defined, > > > > > > > > > >>>>>>>>>>> for example streaming join and unbounded > > aggregate. > > > > We > > > > > > may > > > > > > > > > >> still > > > > > > > > > >>>> need > > > > > > > > > >>>>>>>> to > > > > > > > > > >>>>>>>>>>> figure out solutions for them. > > > > > > > > > >>>>>>>>>>> 2. Watermark lag can not be trusted, because it > > > > > increases > > > > > > > > > >>>> unlimited > > > > > > > > > >>>>>> if > > > > > > > > > >>>>>>>> no > > > > > > > > > >>>>>>>>>>> data is generated in the Kafka. > > > > > > > > > >>>>>>>>>>> But in this case, there is no backlog at all. > > > > > > > > > >>>>>>>>>>> 3. Watermark lag is hard to reflect the amount > of > > > > > > backlog. > > > > > > > > > >> If the > > > > > > > > > >>>>>>>>>> watermark > > > > > > > > > >>>>>>>>>>> lag is 1day or 1 hour or 1second, > > > > > > > > > >>>>>>>>>>> there is possibly only 1 pending record there, > > > which > > > > > > means > > > > > > > no > > > > > > > > > >>>> backlog > > > > > > > > > >>>>>>>> at > > > > > > > > > >>>>>>>>>>> all. > > > > > > > > > >>>>>>>>>>> > > > > > > > > > >>>>>>>>>>> Therefore, IMO, watermark maybe not the ideal > > > metric > > > > > used > > > > > > > to > > > > > > > > > >>>>>> determine > > > > > > > > > >>>>>>>>>>> "isProcessingBacklog". > > > > > > > > > >>>>>>>>>>> What we need is something that reflects the > > number > > > of > > > > > > > records > > > > > > > > > >>>>>>>> unprocessed > > > > > > > > > >>>>>>>>>>> by the job. > > > > > > > > > >>>>>>>>>>> Actually, that is the "pendingRecords" metric > > > > proposed > > > > > in > > > > > > > > > >>>> FLIP-33 and > > > > > > > > > >>>>>>>> has > > > > > > > > > >>>>>>>>>>> been implemented by Kafka source. > > > > > > > > > >>>>>>>>>>> Did you consider using "pendingRecords" metric > to > > > > > > determine > > > > > > > > > >>>>>>>>>>> "isProcessingBacklog"? > > > > > > > > > >>>>>>>>>>> > > > > > > > > > >>>>>>>>>>> Best, > > > > > > > > > >>>>>>>>>>> Jark > > > > > > > > > >>>>>>>>>>> > > > > > > > > > >>>>>>>>>>> > > > > > > > > > >>>>>>>>>>> [1] > > > > > > > > > >>>>>>>>>>> > > > > > > > > > >>>>>>>>>> > > > > > > > > > >>>>>>>> > > > > > > > > > >>>>>> > > > > > > > > > >>>> > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics > > > > > > > > > >>>>>>>>>> < > > > > > > > > > >>>>>>>>>> > > > > > > > > > >>>>>>>> > > > > > > > > > >>>>>> > > > > > > > > > >>>> > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics > > > > > > > > > >>>>>>>>>>> > > > > > > > > > >>>>>>>>>>> > > > > > > > > > >>>>>>>>>>> > > > > > > > > > >>>>>>>>>>> > > > > > > > > > >>>>>>>>>>> On Tue, 15 Aug 2023 at 12:04, Xintong Song < > > > > > > > > > >>>> tonysong...@gmail.com > > > > > > > > > >>>>>>>>>> <mailto:tonysong...@gmail.com>> wrote: > > > > > > > > > >>>>>>>>>>> > > > > > > > > > >>>>>>>>>>>> Sounds good to me. > > > > > > > > > >>>>>>>>>>>> > > > > > > > > > >>>>>>>>>>>> It is true that, if we are introducing the > > > > generalized > > > > > > > > > >>>> watermark, > > > > > > > > > >>>>>>>> there > > > > > > > > > >>>>>>>>>>>> will be other watermark related concepts / > > > > > > configurations > > > > > > > > > >> that > > > > > > > > > >>>> need > > > > > > > > > >>>>>> to > > > > > > > > > >>>>>>>>>> be > > > > > > > > > >>>>>>>>>>>> updated anyway. > > > > > > > > > >>>>>>>>>>>> > > > > > > > > > >>>>>>>>>>>> > > > > > > > > > >>>>>>>>>>>> Best, > > > > > > > > > >>>>>>>>>>>> > > > > > > > > > >>>>>>>>>>>> Xintong > > > > > > > > > >>>>>>>>>>>> > > > > > > > > > >>>>>>>>>>>> > > > > > > > > > >>>>>>>>>>>> > > > > > > > > > >>>>>>>>>>>> On Tue, Aug 15, 2023 at 11:30 AM Xuannan Su < > > > > > > > > > >>>> suxuanna...@gmail.com > > > > > > > > > >>>>>>>>>> <mailto:suxuanna...@gmail.com>> wrote: > > > > > > > > > >>>>>>>>>>>> > > > > > > > > > >>>>>>>>>>>>> Hi Xingtong, > > > > > > > > > >>>>>>>>>>>>> > > > > > > > > > >>>>>>>>>>>>> Thank you for your suggestion. > > > > > > > > > >>>>>>>>>>>>> > > > > > > > > > >>>>>>>>>>>>> After considering the idea of using a general > > > > > > > configuration > > > > > > > > > >>>> key, I > > > > > > > > > >>>>>>>>>> think > > > > > > > > > >>>>>>>>>>>>> it may not be a good idea for the reasons > > below. > > > > > > > > > >>>>>>>>>>>>> > > > > > > > > > >>>>>>>>>>>>> While I agree that using a more general > > > > configuration > > > > > > key > > > > > > > > > >>>> provides > > > > > > > > > >>>>>> us > > > > > > > > > >>>>>>>>>>>> with > > > > > > > > > >>>>>>>>>>>>> the flexibility to switch to other approaches > > to > > > > > > > calculate > > > > > > > > > >> the > > > > > > > > > >>>> lag > > > > > > > > > >>>>>> in > > > > > > > > > >>>>>>>>>> the > > > > > > > > > >>>>>>>>>>>>> future, the downside is that it may cause > > > confusion > > > > > for > > > > > > > > > >> users. > > > > > > > > > >>>> We > > > > > > > > > >>>>>>>>>>>> currently > > > > > > > > > >>>>>>>>>>>>> have fetchEventTimeLag, emitEventTimeLag, and > > > > > > > watermarkLag > > > > > > > > > >> in > > > > > > > > > >>>> the > > > > > > > > > >>>>>>>>>> source, > > > > > > > > > >>>>>>>>>>>>> and it is not clear which specific lag we are > > > > > referring > > > > > > > to. > > > > > > > > > >>>> With > > > > > > > > > >>>>>> the > > > > > > > > > >>>>>>>>>>>>> potential introduction of the Generalized > > > Watermark > > > > > > > > > >> mechanism > > > > > > > > > >>>> in > > > > > > > > > >>>>>> the > > > > > > > > > >>>>>>>>>>>>> future, if I understand correctly, a > watermark > > > > won't > > > > > > > > > >>>> necessarily > > > > > > > > > >>>>>> need > > > > > > > > > >>>>>>>>>> to > > > > > > > > > >>>>>>>>>>>> be > > > > > > > > > >>>>>>>>>>>>> a timestamp. I am concern that the general > > > > > > configuration > > > > > > > > > >> key > > > > > > > > > >>>> may > > > > > > > > > >>>>>> not > > > > > > > > > >>>>>>>>>> be > > > > > > > > > >>>>>>>>>>>>> enough to cover all the use case and we will > > need > > > > to > > > > > > > > > >> introduce > > > > > > > > > >>>> a > > > > > > > > > >>>>>>>>>> general > > > > > > > > > >>>>>>>>>>>>> way to determine the backlog status > regardless. > > > > > > > > > >>>>>>>>>>>>> > > > > > > > > > >>>>>>>>>>>>> For the reasons above, I prefer introducing > the > > > > > > > > > >> configuration > > > > > > > > > >>>> as > > > > > > > > > >>>>>> is, > > > > > > > > > >>>>>>>>>> and > > > > > > > > > >>>>>>>>>>>>> change it later with the a deprecation > process > > or > > > > > > > migration > > > > > > > > > >>>>>> process. > > > > > > > > > >>>>>>>>>> What > > > > > > > > > >>>>>>>>>>>>> do you think? > > > > > > > > > >>>>>>>>>>>>> > > > > > > > > > >>>>>>>>>>>>> Best, > > > > > > > > > >>>>>>>>>>>>> Xuannan > > > > > > > > > >>>>>>>>>>>>> On Aug 14, 2023, 14:09 +0800, Xintong Song < > > > > > > > > > >>>> tonysong...@gmail.com > > > > > > > > > >>>>>>>>>> <mailto:tonysong...@gmail.com>>, > > > > > > > > > >>>>>>>>>>>> wrote: > > > > > > > > > >>>>>>>>>>>>>> Thanks for the explanation. > > > > > > > > > >>>>>>>>>>>>>> > > > > > > > > > >>>>>>>>>>>>>> I wonder if it makes sense to not expose > this > > > > detail > > > > > > via > > > > > > > > > >> the > > > > > > > > > >>>>>>>>>>>>> configuration > > > > > > > > > >>>>>>>>>>>>>> option. To be specific, I suggest not > > mentioning > > > > the > > > > > > > > > >>>> "watermark" > > > > > > > > > >>>>>>>>>>>> keyword > > > > > > > > > >>>>>>>>>>>>> in > > > > > > > > > >>>>>>>>>>>>>> the configuration key and description. > > > > > > > > > >>>>>>>>>>>>>> > > > > > > > > > >>>>>>>>>>>>>> - From the users' perspective, I think they > > only > > > > > need > > > > > > to > > > > > > > > > >> know > > > > > > > > > >>>>>>>> there's > > > > > > > > > >>>>>>>>>> a > > > > > > > > > >>>>>>>>>>>>>> lag higher than the given threshold, Flink > > will > > > > > > consider > > > > > > > > > >>>> latency > > > > > > > > > >>>>>> of > > > > > > > > > >>>>>>>>>>>>>> individual records as less important and > > > > prioritize > > > > > > > > > >> throughput > > > > > > > > > >>>>>> over > > > > > > > > > >>>>>>>>>> it. > > > > > > > > > >>>>>>>>>>>>>> They don't really need the details of how > the > > > lags > > > > > are > > > > > > > > > >>>> calculated. > > > > > > > > > >>>>>>>>>>>>>> - For the internal implementation, I also > > think > > > > > using > > > > > > > > > >>>> watermark > > > > > > > > > >>>>>> lags > > > > > > > > > >>>>>>>>>> is > > > > > > > > > >>>>>>>>>>>>>> a good idea, for the reasons you've already > > > > > mentioned. > > > > > > > > > >>>> However, > > > > > > > > > >>>>>> it's > > > > > > > > > >>>>>>>>>>>> not > > > > > > > > > >>>>>>>>>>>>>> the only possible option. Hiding this detail > > > from > > > > > > users > > > > > > > > > >> would > > > > > > > > > >>>> give > > > > > > > > > >>>>>>>> us > > > > > > > > > >>>>>>>>>>>> the > > > > > > > > > >>>>>>>>>>>>>> flexibility to switch to other approaches if > > > > needed > > > > > in > > > > > > > > > >> future. > > > > > > > > > >>>>>>>>>>>>>> - We are currently working on designing the > > > > > > > > > >> ProcessFunction > > > > > > > > > >>>> API > > > > > > > > > >>>>>>>>>>>>>> (consider it as a DataStream API V2). > There's > > an > > > > > idea > > > > > > to > > > > > > > > > >>>>>> introduce a > > > > > > > > > >>>>>>>>>>>>>> Generalized Watermark mechanism, where > > basically > > > > the > > > > > > > > > >>>> watermark can > > > > > > > > > >>>>>>>> be > > > > > > > > > >>>>>>>>>>>>>> anything that needs to travel along the > > > data-flow > > > > > with > > > > > > > > > >> certain > > > > > > > > > >>>>>>>>>>>> alignment > > > > > > > > > >>>>>>>>>>>>>> strategies, and event time watermark would > be > > > one > > > > > > > specific > > > > > > > > > >>>> case of > > > > > > > > > >>>>>>>> it. > > > > > > > > > >>>>>>>>>>>>> This > > > > > > > > > >>>>>>>>>>>>>> is still an idea and has not been discussed > > and > > > > > agreed > > > > > > > on > > > > > > > > > >> by > > > > > > > > > >>>> the > > > > > > > > > >>>>>>>>>>>>> community, > > > > > > > > > >>>>>>>>>>>>>> and we are preparing a FLIP for it. But if > we > > > are > > > > > > going > > > > > > > > > >> for > > > > > > > > > >>>> it, > > > > > > > > > >>>>>> the > > > > > > > > > >>>>>>>>>>>>> concept > > > > > > > > > >>>>>>>>>>>>>> "watermark-lag-threshold" could be > ambiguous. > > > > > > > > > >>>>>>>>>>>>>> > > > > > > > > > >>>>>>>>>>>>>> I do not intend to block the FLIP on this. > I'd > > > > also > > > > > be > > > > > > > > > >> fine > > > > > > > > > >>>> with > > > > > > > > > >>>>>>>>>>>>>> introducing the configuration as is, and > > > changing > > > > it > > > > > > > > > >> later, if > > > > > > > > > >>>>>>>> needed, > > > > > > > > > >>>>>>>>>>>>> with > > > > > > > > > >>>>>>>>>>>>>> a regular deprecation and migration process. > > > Just > > > > > > making > > > > > > > > > >> my > > > > > > > > > >>>>>>>>>>>> suggestions. > > > > > > > > > >>>>>>>>>>>>>> > > > > > > > > > >>>>>>>>>>>>>> > > > > > > > > > >>>>>>>>>>>>>> Best, > > > > > > > > > >>>>>>>>>>>>>> > > > > > > > > > >>>>>>>>>>>>>> Xintong > > > > > > > > > >>>>>>>>>>>>>> > > > > > > > > > >>>>>>>>>>>>>> > > > > > > > > > >>>>>>>>>>>>>> > > > > > > > > > >>>>>>>>>>>>>> On Mon, Aug 14, 2023 at 12:00 PM Xuannan Su > < > > > > > > > > > >>>>>> suxuanna...@gmail.com > > > > > > > > > >>>>>>>>>> <mailto:suxuanna...@gmail.com>> > > > > > > > > > >>>>>>>>>>>>> wrote: > > > > > > > > > >>>>>>>>>>>>>> > > > > > > > > > >>>>>>>>>>>>>>> Hi Xintong, > > > > > > > > > >>>>>>>>>>>>>>> > > > > > > > > > >>>>>>>>>>>>>>> Thanks for the reply. > > > > > > > > > >>>>>>>>>>>>>>> > > > > > > > > > >>>>>>>>>>>>>>> I have considered using the timestamp in > the > > > > > records > > > > > > to > > > > > > > > > >>>> determine > > > > > > > > > >>>>>>>> the > > > > > > > > > >>>>>>>>>>>>>>> backlog status, and decided to use > watermark > > at > > > > the > > > > > > > end. > > > > > > > > > >> By > > > > > > > > > >>>>>>>>>>>> definition, > > > > > > > > > >>>>>>>>>>>>>>> watermark is the time progress indication > in > > > the > > > > > data > > > > > > > > > >>>> stream. It > > > > > > > > > >>>>>>>>>>>>> indicates > > > > > > > > > >>>>>>>>>>>>>>> the stream’s event time has progressed to > > some > > > > > > specific > > > > > > > > > >>>> time. On > > > > > > > > > >>>>>>>> the > > > > > > > > > >>>>>>>>>>>>> other > > > > > > > > > >>>>>>>>>>>>>>> hand, timestamp in the records is usually > > used > > > to > > > > > > > > > >> generate > > > > > > > > > >>>> the > > > > > > > > > >>>>>>>>>>>>> watermark. > > > > > > > > > >>>>>>>>>>>>>>> Therefore, it appears more appropriate and > > > > > intuitive > > > > > > to > > > > > > > > > >>>> calculate > > > > > > > > > >>>>>>>> the > > > > > > > > > >>>>>>>>>>>>> event > > > > > > > > > >>>>>>>>>>>>>>> time lag by watermark and determine the > > backlog > > > > > > status. > > > > > > > > > >> And > > > > > > > > > >>>> by > > > > > > > > > >>>>>>>> using > > > > > > > > > >>>>>>>>>>>>> the > > > > > > > > > >>>>>>>>>>>>>>> watermark, we can easily deal with the > > > > out-of-order > > > > > > and > > > > > > > > > >> the > > > > > > > > > >>>>>>>> idleness > > > > > > > > > >>>>>>>>>>>>> of the > > > > > > > > > >>>>>>>>>>>>>>> data. > > > > > > > > > >>>>>>>>>>>>>>> > > > > > > > > > >>>>>>>>>>>>>>> Please let me know if you have further > > > questions. > > > > > > > > > >>>>>>>>>>>>>>> > > > > > > > > > >>>>>>>>>>>>>>> Best, > > > > > > > > > >>>>>>>>>>>>>>> Xuannan > > > > > > > > > >>>>>>>>>>>>>>> On Aug 10, 2023, 20:23 +0800, Xintong Song > < > > > > > > > > > >>>>>> tonysong...@gmail.com > > > > > > > > > >>>>>>>>>> <mailto:tonysong...@gmail.com>>, > > > > > > > > > >>>>>>>>>>>>> wrote: > > > > > > > > > >>>>>>>>>>>>>>>> Thanks for preparing the FLIP, Xuannan. > > > > > > > > > >>>>>>>>>>>>>>>> > > > > > > > > > >>>>>>>>>>>>>>>> +1 in general. > > > > > > > > > >>>>>>>>>>>>>>>> > > > > > > > > > >>>>>>>>>>>>>>>> A quick question, could you explain why we > > are > > > > > > relying > > > > > > > > > >> on > > > > > > > > > >>>> the > > > > > > > > > >>>>>>>>>>>>> watermark > > > > > > > > > >>>>>>>>>>>>>>> for > > > > > > > > > >>>>>>>>>>>>>>>> emitting the record attribute? Why not use > > > > > > timestamps > > > > > > > > > >> in the > > > > > > > > > >>>>>>>>>>>>> records? I > > > > > > > > > >>>>>>>>>>>>>>>> don't see any concern in using watermarks. > > > Just > > > > > > > > > >> wondering if > > > > > > > > > >>>>>>>>>>>> there's > > > > > > > > > >>>>>>>>>>>>> any > > > > > > > > > >>>>>>>>>>>>>>>> deep considerations behind this. > > > > > > > > > >>>>>>>>>>>>>>>> > > > > > > > > > >>>>>>>>>>>>>>>> Best, > > > > > > > > > >>>>>>>>>>>>>>>> > > > > > > > > > >>>>>>>>>>>>>>>> Xintong > > > > > > > > > >>>>>>>>>>>>>>>> > > > > > > > > > >>>>>>>>>>>>>>>> > > > > > > > > > >>>>>>>>>>>>>>>> > > > > > > > > > >>>>>>>>>>>>>>>> On Thu, Aug 3, 2023 at 3:03 PM Xuannan Su > < > > > > > > > > > >>>>>> suxuanna...@gmail.com > > > > > > > > > >>>>>>>>>> <mailto:suxuanna...@gmail.com>> > > > > > > > > > >>>>>>>>>>>>> wrote: > > > > > > > > > >>>>>>>>>>>>>>>> > > > > > > > > > >>>>>>>>>>>>>>>>> Hi all, > > > > > > > > > >>>>>>>>>>>>>>>>> > > > > > > > > > >>>>>>>>>>>>>>>>> I am opening this thread to discuss > > FLIP-328: > > > > > Allow > > > > > > > > > >> source > > > > > > > > > >>>>>>>>>>>>> operators to > > > > > > > > > >>>>>>>>>>>>>>>>> determine isProcessingBacklog based on > > > > watermark > > > > > > > > > >> lag[1]. We > > > > > > > > > >>>>>> had a > > > > > > > > > >>>>>>>>>>>>>>> several > > > > > > > > > >>>>>>>>>>>>>>>>> discussions with Dong Ling about the > > design, > > > > and > > > > > > > thanks > > > > > > > > > >>>> for all > > > > > > > > > >>>>>>>>>>>> the > > > > > > > > > >>>>>>>>>>>>>>>>> valuable advice. > > > > > > > > > >>>>>>>>>>>>>>>>> > > > > > > > > > >>>>>>>>>>>>>>>>> The FLIP aims to target the use-case > where > > > user > > > > > > want > > > > > > > to > > > > > > > > > >>>> run a > > > > > > > > > >>>>>>>>>>>> Flink > > > > > > > > > >>>>>>>>>>>>>>> job to > > > > > > > > > >>>>>>>>>>>>>>>>> backfill historical data in a high > > throughput > > > > > > manner > > > > > > > > > >> and > > > > > > > > > >>>>>> continue > > > > > > > > > >>>>>>>>>>>>>>>>> processing real-time data with low > latency. > > > > > > Building > > > > > > > > > >> upon > > > > > > > > > >>>> the > > > > > > > > > >>>>>>>>>>>>> backlog > > > > > > > > > >>>>>>>>>>>>>>>>> concept introduced in FLIP-309[2], this > > > > proposal > > > > > > > > > >> enables > > > > > > > > > >>>>>> sources > > > > > > > > > >>>>>>>>>>>> to > > > > > > > > > >>>>>>>>>>>>>>> report > > > > > > > > > >>>>>>>>>>>>>>>>> their status of processing backlog based > on > > > the > > > > > > > > > >> watermark > > > > > > > > > >>>> lag. > > > > > > > > > >>>>>>>>>>>>>>>>> > > > > > > > > > >>>>>>>>>>>>>>>>> We would greatly appreciate any comments > or > > > > > > feedback > > > > > > > > > >> you > > > > > > > > > >>>> may > > > > > > > > > >>>>>> have > > > > > > > > > >>>>>>>>>>>>> on > > > > > > > > > >>>>>>>>>>>>>>> this > > > > > > > > > >>>>>>>>>>>>>>>>> proposal. > > > > > > > > > >>>>>>>>>>>>>>>>> > > > > > > > > > >>>>>>>>>>>>>>>>> Best, > > > > > > > > > >>>>>>>>>>>>>>>>> Xuannan > > > > > > > > > >>>>>>>>>>>>>>>>> > > > > > > > > > >>>>>>>>>>>>>>>>> > > > > > > > > > >>>>>>>>>>>>>>>>> [1] > > > > > > > > > >>>>>>>>>>>>>>>>> > > > > > > > > > >>>>>>>>>>>>>>> > > > > > > > > > >>>>>>>>>>>>> > > > > > > > > > >>>>>>>>>>>> > > > > > > > > > >>>>>>>>>> > > > > > > > > > >>>>>>>> > > > > > > > > > >>>>>> > > > > > > > > > >>>> > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-328%3A+Allow+source+operators+to+determine+isProcessingBacklog+based+on+watermark+lag > > > > > > > > > >>>>>>>>>> < > > > > > > > > > >>>>>>>>>> > > > > > > > > > >>>>>>>> > > > > > > > > > >>>>>> > > > > > > > > > >>>> > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-328%3A+Allow+source+operators+to+determine+isProcessingBacklog+based+on+watermark+lag > > > > > > > > > >>>>>>>>>>> > > > > > > > > > >>>>>>>>>>>>>>>>> [2] > > > > > > > > > >>>>>>>>>>>>>>>>> > > > > > > > > > >>>>>>>>>>>>>>> > > > > > > > > > >>>>>>>>>>>>> > > > > > > > > > >>>>>>>>>>>> > > > > > > > > > >>>>>>>>>> > > > > > > > > > >>>>>>>> > > > > > > > > > >>>>>> > > > > > > > > > >>>> > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-309%3A+Support+using+larger+checkpointing+interval+when+source+is+processing+backlog > > > > > > > > > >>>>>>>>>> < > > > > > > > > > >>>>>>>>>> > > > > > > > > > >>>>>>>> > > > > > > > > > >>>>>> > > > > > > > > > >>>> > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-309%3A+Support+using+larger+checkpointing+interval+when+source+is+processing+backlog > > > > > > > > > >>>>>>>> > > > > > > > > > >>>>>>>> > > > > > > > > > >>>>>>>> > > > > > > > > > >>>>>> > > > > > > > > > >>>>>> > > > > > > > > > >>>> > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >