Hi Jarl and Dong, I'm a bit confused about the difference between the two competing options. Could one of you elaborate what's the difference between: > 2) The semantics of `#setIsProcessingBacklog(false)` is that it overrides > the effect of the previous invocation (if any) of > `#setIsProcessingBacklog(true)` on the given source instance.
and > 2) The semantics of `#setIsProcessingBacklog(false)` is that the given > source instance will have watermarkLag=false. ? Best, Piotrek czw., 21 wrz 2023 o 15:28 Dong Lin <lindon...@gmail.com> napisał(a): > Hi all, > > Jark and I discussed this FLIP offline and I will summarize our discussion > below. It would be great if you could provide your opinion of the proposed > options. > > Regarding the target use-cases: > - We both agreed that MySQL CDC should have backlog=true when watermarkLag > is large during the binlog phase. > - Dong argued that other streaming sources with watermarkLag defined (e.g. > Kafka) should also have backlog=true when watermarkLag is large. The > pros/cons discussion below assume this use-case needs to be supported. > > The 1st option is what is currently proposed in FLIP-328, with the > following key characteristics: > 1) There is one job-level config (i.e. > pipeline.backlog.watermark-lag-threshold) that applies to all sources with > watermarkLag metric defined. > 2) The semantics of `#setIsProcessingBacklog(false)` is that it overrides > the effect of the previous invocation (if any) of > `#setIsProcessingBacklog(true)` on the given source instance. > > The 2nd option is what Jark proposed in this email thread, with the > following key characteristics: > 1) Add source-specific config (both Java API and SQL source property) to > every source for which we want to set backlog status based on the > watermarkLag metric. For example, we might add separate Java APIs > `#setWatermarkLagThreshold` for MySQL CDC source, HybridSource, > KafkaSource, PulsarSource etc. > 2) The semantics of `#setIsProcessingBacklog(false)` is that the given > source instance will have watermarkLag=false. > > Here are the key pros/cons of these two options. > > Cons of the 1st option: > 1) The semantics of `#setIsProcessingBacklog(false)` is harder to > understand for Flink operator developers than the corresponding semantics > in option-2. > > Cons of the 2nd option: > 1) More work for end-users. For a job with multiple sources that need to be > configured with a watermark lag threshold, users need to specify multiple > configs (one for each source) instead of specifying one job-level config. > > 2) More work for Flink operator developers. Overall there are more public > APIs (one Java API and one SQL property for each source that needs to > determine backlog based on watermark) exposed to end users. This also adds > more burden for the Flink community to maintain these APIs. > > 3) It would be hard (e.g. require backward incompatible API change) to > extend the Flink runtime to support job-level config to set watermark > strategy in the future (e.g. support the > pipeline.backlog.watermark-lag-threshold in option-1). This is because an > existing source operator's code might have hardcoded an invocation of > `#setIsProcessingBacklog(false)`, which means the backlog status must be > set to true, which prevents Flink runtime from setting backlog=true when a > new strategy is triggered. > > Overall, I am still inclined to choose option-1 because it is more > extensible and simpler to use in the long term when we want to support/use > multiple sources whose backlog status can change based on the watermark > lag. While option-1's `#setIsProcessingBacklog` is a bit harder to > understand than option-2, I think this overhead/cost is worthwhile as it > makes end-users' life easier in the long term. > > Jark: thank you for taking the time to review this FLIP. Please feel free > to comment if I missed anything in the pros/cons above. > > Jark and I have not reached agreement on which option is better. It will be > really helpful if we can get more comments on these options. > > Thanks, > Dong > > > On Tue, Sep 19, 2023 at 11:26 AM Dong Lin <lindon...@gmail.com> wrote: > > > Hi Jark, > > > > Thanks for the reply. Please see my comments inline. > > > > On Tue, Sep 19, 2023 at 10:12 AM Jark Wu <imj...@gmail.com> wrote: > > > >> Hi Dong, > >> > >> Sorry for the late reply. > >> > >> > The rationale is that if there is any strategy that is triggered and > >> says > >> > backlog=true, then job's backlog should be true. Otherwise, the job's > >> > backlog status is false. > >> > >> I'm quite confused about this. Does that mean, if the source is in the > >> changelog phase, the source has to continuously invoke > >> "setIsProcessingBacklog(true)" (in an infinite loop?). Otherwise, > >> the job's backlog status would be set to false by the framework? > >> > > > > No, the source would not have to continuously invoke > > setIsProcessingBacklog(true) in an infinite loop. > > > > Actually, I am not very sure why there is confusion that "the job's > > backlog status would be set to false by the framework". Could you explain > > where that comes from? > > > > I guess it might be useful to provide a complete overview of how > > setIsProcessingBacklog(...) > > and pipline.backlog.watermark-lag-threshold work together to determine > the > > overall job's backlog status. Let me explain it below. > > > > Here is the semantics/behavior of setIsProcessingBacklog(..). > > - This method is invoked on a per-source basis. > > - This method can be invoked multiple times with true/false as its input > > parameter. > > - For a given source, the last invocation of this method overwrites the > > effect of earlier invocation of this method. > > - For a given source, if this method has been invoked at least once and > > the last invocation of this method has isProcessingBacklog = true, then > it > > is guaranteed that the backlog status of this source is set to true. > > > > Here is the semantics/behavior of > pipline.backlog.watermark-lag-threshold: > > - This config is specified at the job level and applies to every source > > included in this job. > > - For a given source, if it's watermarkLag metric is available (see > > FLIP-33) and watermarkLag > watermark-lag-threshold, then it is > guaranteed > > that backlog status of this source is set to true. > > > > Here is how the source's backlog status is determined: If a rule > specified > > above says the source's backog status should be true, then the source's > > backlog status is set to true. Otherwise, it is set to false. > > > > How is how the job's backlog status is determined: If there exists a > > source that is currently running and the source's backlog status is set > to > > true, then the job's backlog status is set to true. Otherwise, it is set > to > > false. > > > > Hopefully this can help clarify the behavior. If needed, we can update > the > > relevant doc (e.g. setIsProcessingBacklog() Java doc) to make semantics > > above clearer for Flink users. > > > > And I would be happy to discuss/explain this design offline when you have > > time. > > > > Thanks, > > Dong > > > > > >> > >> Best, > >> Jark > >> > >> On Tue, 19 Sept 2023 at 09:13, Dong Lin <lindon...@gmail.com> wrote: > >> > >> > Hi Jark, > >> > > >> > Do you have time to comment on whether the current design looks good? > >> > > >> > I plan to start voting in 3 days if there is no follow-up comment. > >> > > >> > Thanks, > >> > Dong > >> > > >> > > >> > On Fri, Sep 15, 2023 at 2:01 PM Jark Wu <imj...@gmail.com> wrote: > >> > > >> > > Hi Dong, > >> > > > >> > > > Note that we can not simply enforce the semantics of "any > >> invocation of > >> > > > setIsProcessingBacklog(false) will set the job's backlog status to > >> > > false". > >> > > > Suppose we have a job with two operators, where operatorA invokes > >> > > > setIsProcessingBacklog(false) and operatorB invokes > >> > > > setIsProcessingBacklog(true). There will be conflict if we use the > >> > > > semantics of "any invocation of setIsProcessingBacklog(false) will > >> set > >> > > the > >> > > > job's backlog status to false". > >> > > > >> > > So it should set job's backlog status to false if the job has only a > >> > single > >> > > source, > >> > > right? > >> > > > >> > > Could you elaborate on the behavior if there is a job with a single > >> > source, > >> > > and the watermark lag exceeds the configured value (should set > >> backlog to > >> > > true?), > >> > > but the source invokes "setIsProcessingBacklog(false)"? Or the > inverse > >> > one, > >> > > the source invokes "setIsProcessingBacklog(false)" first, but the > >> > watermark > >> > > lag > >> > > exceeds the configured value. > >> > > > >> > > This is the conflict I'm concerned about. > >> > > > >> > > Best, > >> > > Jark > >> > > > >> > > On Fri, 15 Sept 2023 at 12:00, Dong Lin <lindon...@gmail.com> > wrote: > >> > > > >> > > > Hi Jark, > >> > > > > >> > > > Please see my comments inline. > >> > > > > >> > > > On Fri, Sep 15, 2023 at 10:35 AM Jark Wu <imj...@gmail.com> > wrote: > >> > > > > >> > > > > Hi Dong, > >> > > > > > >> > > > > Please see my comments inline below. > >> > > > > >> > > > > >> > > > > > Hmm.. can you explain what you mean by "different watermark > >> delay > >> > > > > > definitions for each source"? > >> > > > > > >> > > > > For example, "table1" defines a watermark with delay 5 seconds, > >> > > > > "table2" defines a watermark with delay 10 seconds. They have > >> > different > >> > > > > watermark delay definitions. So it is also reasonable they have > >> > > different > >> > > > > watermark lag definitions, e.g., "table1" allows "10mins" and > >> > "table2" > >> > > > > allows "20mins". > >> > > > > > >> > > > > >> > > > I think the watermark delay you mentioned above is conceptually / > >> > > > fundamentally different from the watermark-lag-threshold proposed > in > >> > this > >> > > > FLIP. > >> > > > > >> > > > It might be useful to revisit the semantics of these two concepts: > >> > > > - watermark delay is used to account for the maximum amount of > >> > > orderliness > >> > > > that users expect (or willing to wait for) for records from a > given > >> > > source. > >> > > > - watermark-lag-threshold is used to define when processing > latency > >> is > >> > no > >> > > > longer important (e.g. because data is already stale). > >> > > > > >> > > > Even though users might expect different out of orderliness for > >> > different > >> > > > sources, users do not necessarily have different definitions / > >> > thresholds > >> > > > for when a record is considered "already stale". > >> > > > > >> > > > > >> > > > > > >> > > > > > I think there is probably misunderstanding here. FLIP-309 does > >> NOT > >> > > > > directly > >> > > > > > specify when backlog is false. It is intentionally specified > in > >> > such > >> > > a > >> > > > > way > >> > > > > > that there will not be any conflict between these rules. > >> > > > > > >> > > > > Do you mean FLIP-309 doesn't allow to specify backlog to be > false? > >> > > > > Is this mentioned in FLIP-309? This is completely different from > >> > what I > >> > > > > > >> > > > > >> > > > Can you explain what you mean by "allow to specify backlog to be > >> > false"? > >> > > > > >> > > > If what you mean is that "can invoke > setIsProcessingBacklog(false)", > >> > then > >> > > > FLIP-309 supports doing this. > >> > > > > >> > > > If what you mean is that "any invocation of > >> > setIsProcessingBacklog(false) > >> > > > will set the job's backlog status to false", then FLIP-309 does > not > >> > > support > >> > > > this. I believe the existing Java doc of this API and FLIP-309 is > >> > > > compatible with this explanation. > >> > > > > >> > > > Note that we can not simply enforce the semantics of "any > >> invocation of > >> > > > setIsProcessingBacklog(false) will set the job's backlog status to > >> > > false". > >> > > > Suppose we have a job with two operators, where operatorA invokes > >> > > > setIsProcessingBacklog(false) and operatorB invokes > >> > > > setIsProcessingBacklog(true). There will be conflict if we use the > >> > > > semantics of "any invocation of setIsProcessingBacklog(false) will > >> set > >> > > the > >> > > > job's backlog status to false". > >> > > > > >> > > > Would this answer your question? > >> > > > > >> > > > Best, > >> > > > Dong > >> > > > > >> > > > > >> > > > > understand. From the API interface > >> > > "ctx.setIsProcessingBacklog(boolean)", > >> > > > > it allows users to invoke "setIsProcessingBacklog(false)". And > >> > FLIP-309 > >> > > > > also says "MySQL CDC source should report > >> isProcessingBacklog=false > >> > > > > at the beginning of the changelog stage." If not, maybe we need > to > >> > > > revisit > >> > > > > FLIP-309. > >> > > > > >> > > > > >> > > > > Best, > >> > > > > Jark > >> > > > > > >> > > > > > >> > > > > > >> > > > > On Fri, 15 Sept 2023 at 08:41, Dong Lin <lindon...@gmail.com> > >> wrote: > >> > > > > > >> > > > > > Hi Jark, > >> > > > > > > >> > > > > > Do you have any follow-up comment? > >> > > > > > > >> > > > > > My gut feeling is that suppose we need to support per-source > >> > > watermark > >> > > > > lag > >> > > > > > specification in the future (not sure we have a use-case for > >> this > >> > > right > >> > > > > > now), we can add such a config in the future with a follow-up > >> FLIP. > >> > > The > >> > > > > > job-level config will still be useful as it makes users' > >> > > configuration > >> > > > > > simpler for common scenarios. > >> > > > > > > >> > > > > > If it is OK, can we agree to make incremental progress for > Flink > >> > and > >> > > > > start > >> > > > > > a voting thread for this FLIP? > >> > > > > > > >> > > > > > Thanks, > >> > > > > > Dong > >> > > > > > > >> > > > > > > >> > > > > > On Mon, Sep 11, 2023 at 4:41 PM Jark Wu <imj...@gmail.com> > >> wrote: > >> > > > > > > >> > > > > > > Hi Dong, > >> > > > > > > > >> > > > > > > Please see my comments inline. > >> > > > > > > > >> > > > > > > > As a result, the proposed job-level > >> > > > > > > > config will be applied only in the changelog stage. So > >> there is > >> > > no > >> > > > > > > > difference between these two approaches in this particular > >> > case, > >> > > > > right? > >> > > > > > > > >> > > > > > > How the job-level config can be applied ONLY in the > changelog > >> > > stage? > >> > > > > > > I think it is only possible if it is implemented by the CDC > >> > source > >> > > > > > itself, > >> > > > > > > because the framework doesn't know which stage of the source > >> is. > >> > > > > > > Know that the CDC source may emit watermarks with a very > small > >> > lag > >> > > > > > > in the snapshot stage, and the job-level config may turn the > >> > > backlog > >> > > > > > > status into false. > >> > > > > > > > >> > > > > > > > On the other hand, per-source config will be necessary if > >> users > >> > > > want > >> > > > > to > >> > > > > > > > apply different watermark lag thresholds for different > >> sources > >> > in > >> > > > the > >> > > > > > > same > >> > > > > > > > job. > >> > > > > > > > >> > > > > > > We also have different watermark delay definitions for each > >> > source, > >> > > > > > > I think this's also reasonable and necessary to have > different > >> > > > > watermark > >> > > > > > > lags. > >> > > > > > > > >> > > > > > > > >> > > > > > > > Each source can have its own rule that specifies when the > >> > backlog > >> > > > can > >> > > > > > be > >> > > > > > > true > >> > > > > > > > (e.g. MySql CDC says the backlog should be true during the > >> > > snapshot > >> > > > > > > stage). > >> > > > > > > > And we can have a job-level config that specifies when the > >> > > backlog > >> > > > > > should > >> > > > > > > > be true. Note that it is designed in such a way that none > of > >> > > these > >> > > > > > rules > >> > > > > > > > specify when the backlog should be false. That is why > there > >> is > >> > no > >> > > > > > > conflict > >> > > > > > > > by definition. > >> > > > > > > > >> > > > > > > IIUC, FLIP-309 provides `setIsProcessingBacklog` to specify > >> when > >> > > the > >> > > > > > > backlog > >> > > > > > > is true and when is FALSE. This conflicts with the job-level > >> > config > >> > > > as > >> > > > > it > >> > > > > > > will turn > >> > > > > > > the status into true. > >> > > > > > > > >> > > > > > > > If I understand your comments correctly, you mean that we > >> might > >> > > > have > >> > > > > a > >> > > > > > > > Flink SQL DDL with user-defined watermark expressions. And > >> > users > >> > > > also > >> > > > > > > want > >> > > > > > > > to set the backlog to true if the watermark generated by > >> that > >> > > > > > > > user-specified expression exceeds a threshold. > >> > > > > > > > >> > > > > > > No. I mean the source may not support generating watermarks, > >> so > >> > the > >> > > > > > > watermark > >> > > > > > > expression is applied in a following operator (instead of in > >> the > >> > > > source > >> > > > > > > operator). > >> > > > > > > This will result in the watermark lag doesn't work in this > >> case > >> > and > >> > > > > > confuse > >> > > > > > > users. > >> > > > > > > > >> > > > > > > > You are right that this is a limitation. However, this is > >> only > >> > a > >> > > > > > > short-term > >> > > > > > > > limitation which we added to make sure that we can focus > on > >> the > >> > > > > > > capability > >> > > > > > > > to switch from backlog=true to backlog=false. In the > >> future, we > >> > > > will > >> > > > > > > remove > >> > > > > > > > this limitation and also support switching from > >> backlog=false > >> > to > >> > > > > > > > backlog=true. > >> > > > > > > > >> > > > > > > I can understand it may be difficult to support runtime mode > >> > > > switching > >> > > > > > back > >> > > > > > > and forth. > >> > > > > > > However, I think this should be a limitation of FLIP-327, > not > >> > > > FLIP-328. > >> > > > > > > IIUC, > >> > > > > > > FLIP-309 doesn't have this limitation, right? I just don't > >> > > understand > >> > > > > > > what's the > >> > > > > > > challenge to switch a flag? > >> > > > > > > > >> > > > > > > Best, > >> > > > > > > Jark > >> > > > > > > > >> > > > > > > > >> > > > > > > On Sun, 10 Sept 2023 at 19:44, Dong Lin < > lindon...@gmail.com> > >> > > wrote: > >> > > > > > > > >> > > > > > > > Hi Jark, > >> > > > > > > > > >> > > > > > > > Thanks for the comments. Please see my comments inline. > >> > > > > > > > > >> > > > > > > > On Sat, Sep 9, 2023 at 4:13 PM Jark Wu <imj...@gmail.com> > >> > wrote: > >> > > > > > > > > >> > > > > > > > > Hi Xuannan, > >> > > > > > > > > > >> > > > > > > > > I leave my comments inline. > >> > > > > > > > > > >> > > > > > > > > > In the case where a user wants to > >> > > > > > > > > > use a CDC source and also determine backlog status > >> based on > >> > > > > > watermark > >> > > > > > > > > > lag, we still need to define the rule when that occurs > >> > > > > > > > > > >> > > > > > > > > This rule should be defined by the source itself (who > >> knows > >> > > > backlog > >> > > > > > > > best), > >> > > > > > > > > not by the framework. In the case of CDC source, it > >> reports > >> > > > > > > > isBacklog=true > >> > > > > > > > > during snapshot stage, and report isBacklog=false during > >> > > > changelog > >> > > > > > > stage > >> > > > > > > > if > >> > > > > > > > > watermark-lag is within the threshold. > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > I am not sure I fully understand the difference between > >> adding > >> > a > >> > > > > > > job-level > >> > > > > > > > config vs. adding a per-source config. > >> > > > > > > > > >> > > > > > > > In the case of CDC, its watermark lag should be either > >> > > unde-defined > >> > > > > or > >> > > > > > > > really large in the snapshot stage. As a result, the > >> proposed > >> > > > > job-level > >> > > > > > > > config will be applied only in the changelog stage. So > >> there is > >> > > no > >> > > > > > > > difference between these two approaches in this particular > >> > case, > >> > > > > right? > >> > > > > > > > > >> > > > > > > > There are two advantages of the job-level config over > >> > per-source > >> > > > > > config: > >> > > > > > > > > >> > > > > > > > 1) Configuration is simpler. For example, suppose a user > >> has a > >> > > > Flink > >> > > > > > job > >> > > > > > > > that consumes records from multiple Kafka sources and > wants > >> to > >> > > > > > determine > >> > > > > > > > backlog status for these Kafka sources using the same > >> watermark > >> > > lag > >> > > > > > > > threshold, there is no need for users to repeatedly > specify > >> > this > >> > > > > > > threshold > >> > > > > > > > for each source. > >> > > > > > > > > >> > > > > > > > 2) There is a smaller number of public APIs overall. In > >> > > particular, > >> > > > > > > instead > >> > > > > > > > of repeatedly adding a > >> > > setProcessingBacklogWatermarkLagThreshold() > >> > > > > API > >> > > > > > > for > >> > > > > > > > every source operator that has even-time watermark lag > >> defined, > >> > > we > >> > > > > only > >> > > > > > > > need to add one job-level config. Less public API means > >> better > >> > > > > > simplicity > >> > > > > > > > and maintainability in general. > >> > > > > > > > > >> > > > > > > > On the other hand, per-source config will be necessary if > >> users > >> > > > want > >> > > > > to > >> > > > > > > > apply different watermark lag thresholds for different > >> sources > >> > in > >> > > > the > >> > > > > > > same > >> > > > > > > > job. Personally, I find this a bit counter-intuitive for > >> users > >> > to > >> > > > > > specify > >> > > > > > > > different watermark lag thresholds in the same job. > >> > > > > > > > > >> > > > > > > > Do you think there is any real-word use-case that requires > >> > this? > >> > > > > Could > >> > > > > > > you > >> > > > > > > > provide a specific use-case where per-source config can > >> provide > >> > > an > >> > > > > > > > advantage over the job-level config? > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > > I think it's not intuitive to combine it with the > logical > >> OR > >> > > > > > operation. > >> > > > > > > > > Even for the > >> > > > > > > > > combination logic of backlog status from different > >> channels, > >> > > > > FLIP-309 > >> > > > > > > > said > >> > > > > > > > > it is > >> > > > > > > > > "up to the operator to determine its output records' > >> > isBacklog > >> > > > > value" > >> > > > > > > and > >> > > > > > > > > proposed > >> > > > > > > > > 3 different strategies. Therefore, I think backlog > status > >> > from > >> > > a > >> > > > > > single > >> > > > > > > > > source should > >> > > > > > > > > be up to the source. > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > For both the job-level config and the per-source config, > it > >> is > >> > > > > > eventually > >> > > > > > > > up to the user to decide the computation logic of the > >> backlog > >> > > > status. > >> > > > > > > > Whether this mechanism is implemented at the per-source > >> level > >> > or > >> > > > > > > framework > >> > > > > > > > level is probably more like an implementation detail. > >> > > > > > > > > >> > > > > > > > Eventually, I think the choice between these two > approaches > >> > > depends > >> > > > > on > >> > > > > > > > whether we have any use-case for users to specify > different > >> > > > watermark > >> > > > > > lag > >> > > > > > > > thresholds in the same job. > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > > > >> > > > > > > > > IMO, a better API design is not how to resolve conflicts > >> but > >> > > not > >> > > > > > > > > introducing conflicts. > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > Just to clarify, the current FLIP does not introduce any > >> > > conflict. > >> > > > > Each > >> > > > > > > > source can have its own rule that specifies when the > backlog > >> > can > >> > > be > >> > > > > > true > >> > > > > > > > (e.g. MySql CDC says the backlog should be true during the > >> > > snapshot > >> > > > > > > stage). > >> > > > > > > > And we can have a job-level config that specifies when the > >> > > backlog > >> > > > > > should > >> > > > > > > > be true. Note that it is designed in such a way that none > of > >> > > these > >> > > > > > rules > >> > > > > > > > specify when the backlog should be false. That is why > there > >> is > >> > no > >> > > > > > > conflict > >> > > > > > > > by definition. > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > Let the source determine backlog status removes the > >> conflicts > >> > > and I > >> > > > > > don't > >> > > > > > > > > see big > >> > > > > > > > > disadvantages. > >> > > > > > > > > > >> > > > > > > > > > It should not confuse the user that > >> > > > > > > > > > DataStream#assignTimestampsAndWatermarks doesn't work > >> with > >> > > > > > > > > > backlog.watermark-lag-threshold, as it is not a > source. > >> > > > > > > > > > >> > > > > > > > > Hmm, so this configuration may confuse Flink SQL users, > >> > because > >> > > > all > >> > > > > > > > > watermarks > >> > > > > > > > > are defined on the source DDL, but it may use a separate > >> > > operator > >> > > > > to > >> > > > > > > emit > >> > > > > > > > > watermarks > >> > > > > > > > > if the source doesn't support emitting watermarks. > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > If I understand your comments correctly, you mean that we > >> might > >> > > > have > >> > > > > a > >> > > > > > > > Flink SQL DDL with user-defined watermark expressions. And > >> > users > >> > > > also > >> > > > > > > want > >> > > > > > > > to set the backlog to true if the watermark generated by > >> that > >> > > > > > > > user-specified expression exceeds a threshold. > >> > > > > > > > > >> > > > > > > > That is a good point and use-case. I agree we should also > >> cover > >> > > > this > >> > > > > > > > scenario. And we can update FLIP-328 to mention that the > >> > > job-level > >> > > > > > config > >> > > > > > > > will also be applicable when the watermark derived from > the > >> > Flink > >> > > > SQL > >> > > > > > DDL > >> > > > > > > > exceeds this threshold. Would this address your concern? > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > I think the description in the FLIP actually means the > >> > other > >> > > > way > >> > > > > > > > > > around, where the job can never switch back to batch > >> mode > >> > > once > >> > > > it > >> > > > > > has > >> > > > > > > > > > switched into streaming mode. This is to align with > the > >> > > current > >> > > > > > state > >> > > > > > > > > > of FLIP-327[1], where only switching from batch to > >> stream > >> > > mode > >> > > > is > >> > > > > > > > > > supported. > >> > > > > > > > > > >> > > > > > > > > This sounds like a limitation of FLIP-327 (that > execution > >> > mode > >> > > > > > depends > >> > > > > > > on > >> > > > > > > > > backlog status). > >> > > > > > > > > But the backlog status shouldn't have this limitation, > >> > because > >> > > it > >> > > > > is > >> > > > > > > not > >> > > > > > > > > only used for execution > >> > > > > > > > > switching. > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > You are right that this is a limitation. However, this is > >> only > >> > a > >> > > > > > > short-term > >> > > > > > > > limitation which we added to make sure that we can focus > on > >> the > >> > > > > > > capability > >> > > > > > > > to switch from backlog=true to backlog=false. In the > >> future, we > >> > > > will > >> > > > > > > remove > >> > > > > > > > this limitation and also support switching from > >> backlog=false > >> > to > >> > > > > > > > backlog=true. > >> > > > > > > > > >> > > > > > > > The capability to switch from backlog=true to > backlog=false > >> > will > >> > > > > > > mitigate a > >> > > > > > > > lot of problems we are facing now. As it is common for > >> users to > >> > > > > start a > >> > > > > > > > Flink job to process backlog data followed by real-time > >> data. > >> > On > >> > > > the > >> > > > > > > other > >> > > > > > > > hand, switching from backlog=false to backlog=true is > useful > >> > when > >> > > > > there > >> > > > > > > is > >> > > > > > > > a traffic spike while the Flink job is processing > real-time > >> > data, > >> > > > > which > >> > > > > > > is > >> > > > > > > > also useful to address but less important than the > previous > >> > one. > >> > > > > > > > > >> > > > > > > > Given that both features require considerable changes to > the > >> > > > > underlying > >> > > > > > > > runtime, we think it might be useful and safe to tackle > them > >> > one > >> > > by > >> > > > > > one. > >> > > > > > > > > >> > > > > > > > Thanks again for the comments. Please let us know what you > >> > think. > >> > > > > > > > > >> > > > > > > > Best, > >> > > > > > > > Dong > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > > > >> > > > > > > > > Best, > >> > > > > > > > > Jark > >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > On Fri, 8 Sept 2023 at 19:09, Xuannan Su < > >> > > suxuanna...@gmail.com> > >> > > > > > > wrote: > >> > > > > > > > > > >> > > > > > > > > > Hi Jark and Leonard, > >> > > > > > > > > > > >> > > > > > > > > > Thanks for the comments. Please see my reply below. > >> > > > > > > > > > > >> > > > > > > > > > @Jark > >> > > > > > > > > > > >> > > > > > > > > > > I think a better API doesn't compete with itself. > >> > > Therefore, > >> > > > > I'm > >> > > > > > in > >> > > > > > > > > > favor of > >> > > > > > > > > > > supporting the watermark lag threshold for each > source > >> > > > without > >> > > > > > > > > > introducing > >> > > > > > > > > > > any framework API and configuration. > >> > > > > > > > > > > >> > > > > > > > > > I don't think supporting the watermark lag threshold > for > >> > each > >> > > > > > source > >> > > > > > > > > > can avoid the competition problem. In the case where a > >> user > >> > > > wants > >> > > > > > to > >> > > > > > > > > > use a CDC source and also determine backlog status > >> based on > >> > > > > > watermark > >> > > > > > > > > > lag, we still need to define the rule when that > occurs. > >> > With > >> > > > that > >> > > > > > > > > > said, I think it is more intuitive to combine it with > >> the > >> > > > logical > >> > > > > > OR > >> > > > > > > > > > operation, as the strategies (FLIP-309, FLIP-328) only > >> > > > determine > >> > > > > > when > >> > > > > > > > > > the source's backlog status should be True. What do > you > >> > > think? > >> > > > > > > > > > > >> > > > > > > > > > > Besides, this can address another concern that the > >> > > watermark > >> > > > > may > >> > > > > > be > >> > > > > > > > > > > generated by DataStream#assignTimestampsAnd > >> > > > > > > > > > > Watermarks which doesn't > >> > > > > > > > > > > work with the backlog.watermark-lag-threshold job > >> config > >> > > > > > > > > > > >> > > > > > > > > > The description of the configuration explicitly states > >> that > >> > > "a > >> > > > > > source > >> > > > > > > > > > would report isProcessingBacklog=true if its watermark > >> lag > >> > > > > exceeds > >> > > > > > > the > >> > > > > > > > > > configured value". It should not confuse the user that > >> > > > > > > > > > DataStream#assignTimestampsAndWatermarks doesn't work > >> with > >> > > > > > > > > > backlog.watermark-lag-threshold, as it is not a > source. > >> > > > > > > > > > > >> > > > > > > > > > > Does that mean the job can never back to streaming > >> mode > >> > > once > >> > > > > > > switches > >> > > > > > > > > > into > >> > > > > > > > > > > backlog mode? It sounds like not a complete FLIP to > >> me. > >> > Is > >> > > it > >> > > > > > > > possible > >> > > > > > > > > to > >> > > > > > > > > > > support switching back in this FLIP? > >> > > > > > > > > > > >> > > > > > > > > > I think the description in the FLIP actually means the > >> > other > >> > > > way > >> > > > > > > > > > around, where the job can never switch back to batch > >> mode > >> > > once > >> > > > it > >> > > > > > has > >> > > > > > > > > > switched into streaming mode. This is to align with > the > >> > > current > >> > > > > > state > >> > > > > > > > > > of FLIP-327[1], where only switching from batch to > >> stream > >> > > mode > >> > > > is > >> > > > > > > > > > supported. > >> > > > > > > > > > > >> > > > > > > > > > @Leonard > >> > > > > > > > > > > >> > > > > > > > > > > > The FLIP describe that: And it should report > >> > > > > > > > > isProcessingBacklog=false > >> > > > > > > > > > at the beginning of the snapshot stage. > >> > > > > > > > > > > This should be “changelog stage” > >> > > > > > > > > > > >> > > > > > > > > > I think the description is in FLIP-309. Thanks for > >> pointing > >> > > > out. > >> > > > > I > >> > > > > > > > > > updated the description. > >> > > > > > > > > > > >> > > > > > > > > > > I'm not sure if it's enough to support this feature > >> only > >> > in > >> > > > > > FLIP-27 > >> > > > > > > > > > Source. Although we are pushing the sourceFunction API > >> to > >> > be > >> > > > > > removed, > >> > > > > > > > > these > >> > > > > > > > > > APIs will be survive one or two versions in flink repo > >> > before > >> > > > > they > >> > > > > > > are > >> > > > > > > > > > actually removed. > >> > > > > > > > > > > >> > > > > > > > > > I agree that it is good to support the SourceFunction > >> API. > >> > > > > However, > >> > > > > > > > > > given that the SourceFunction API is marked as > >> deprecated, > >> > I > >> > > > > think > >> > > > > > I > >> > > > > > > > > > will prioritize supporting the FLIP-27 Source. We can > >> > support > >> > > > the > >> > > > > > > > > > SourceFunction API after the > >> > > > > > > > > > FLIP-27 source. What do you think? > >> > > > > > > > > > > >> > > > > > > > > > Best regards, > >> > > > > > > > > > Xuannan > >> > > > > > > > > > > >> > > > > > > > > > [1] > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-327%3A+Support+switching+from+batch+to+stream+mode+to+improve+throughput+when+processing+backlog+data > >> > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > On Fri, Sep 8, 2023 at 1:02 AM Leonard Xu < > >> > xbjt...@gmail.com > >> > > > > >> > > > > > wrote: > >> > > > > > > > > > > > >> > > > > > > > > > > Thanks Xuannan for driving this FLIP ! > >> > > > > > > > > > > > >> > > > > > > > > > > The proposal generally looks good to me, but I still > >> left > >> > > > some > >> > > > > > > > > comments: > >> > > > > > > > > > > > >> > > > > > > > > > > > One more question about the FLIP is that the FLIP > >> says > >> > > > "Note > >> > > > > > that > >> > > > > > > > > this > >> > > > > > > > > > > > config does not support switching source's > >> > > > > isProcessingBacklog > >> > > > > > > from > >> > > > > > > > > > false to true > >> > > > > > > > > > > > for now.” Does that mean the job can never back to > >> > > > streaming > >> > > > > > mode > >> > > > > > > > > once > >> > > > > > > > > > switches into > >> > > > > > > > > > > > backlog mode? It sounds like not a complete FLIP > to > >> me. > >> > > Is > >> > > > it > >> > > > > > > > > possible > >> > > > > > > > > > to > >> > > > > > > > > > > > support switching back in this FLIP? > >> > > > > > > > > > > +1 for Jark’s concern, IIUC, the state transition of > >> > > > > > > > > IsProcessingBacklog > >> > > > > > > > > > depends on whether the data in the source is > processing > >> > > backlog > >> > > > > > data > >> > > > > > > or > >> > > > > > > > > > not. Different sources will have different backlog > >> status > >> > and > >> > > > > which > >> > > > > > > may > >> > > > > > > > > > change over time. From a general perspective, we > should > >> not > >> > > > have > >> > > > > > this > >> > > > > > > > > > restriction. > >> > > > > > > > > > > > >> > > > > > > > > > > > The FLIP describe that: And it should report > >> > > > > > > > > isProcessingBacklog=false > >> > > > > > > > > > at the beginning of the snapshot stage. > >> > > > > > > > > > > This should be “changelog stage” > >> > > > > > > > > > > > >> > > > > > > > > > > I'm not sure if it's enough to support this feature > >> only > >> > in > >> > > > > > FLIP-27 > >> > > > > > > > > > Source. Although we are pushing the sourceFunction API > >> to > >> > be > >> > > > > > removed, > >> > > > > > > > > these > >> > > > > > > > > > APIs will be survive one or two versions in flink repo > >> > before > >> > > > > they > >> > > > > > > are > >> > > > > > > > > > actually removed. > >> > > > > > > > > > > > >> > > > > > > > > > > Best, > >> > > > > > > > > > > Leonard > >> > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > Best, > >> > > > > > > > > > > > Jark > >> > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > On Thu, 7 Sept 2023 at 13:51, Xuannan Su < > >> > > > > > suxuanna...@gmail.com> > >> > > > > > > > > > wrote: > >> > > > > > > > > > > > > >> > > > > > > > > > > >> Hi all, > >> > > > > > > > > > > >> > >> > > > > > > > > > > >> Thank you for all the reviews and suggestions. > >> > > > > > > > > > > >> > >> > > > > > > > > > > >> I believe all the comments have been addressed. > If > >> > there > >> > > > are > >> > > > > > no > >> > > > > > > > > > > >> further comments, I plan to open the voting > thread > >> for > >> > > > this > >> > > > > > FLIP > >> > > > > > > > > early > >> > > > > > > > > > > >> next week. > >> > > > > > > > > > > >> > >> > > > > > > > > > > >> Best regards, > >> > > > > > > > > > > >> Xuannan > >> > > > > > > > > > > >> > >> > > > > > > > > > > >> On Thu, Sep 7, 2023 at 12:09 AM Jing Ge > >> > > > > > > > <j...@ververica.com.invalid > >> > > > > > > > > > > >> > > > > > > > > > > >> wrote: > >> > > > > > > > > > > >>> > >> > > > > > > > > > > >>> Hi Xuannan, > >> > > > > > > > > > > >>> > >> > > > > > > > > > > >>> I thought FLIP-328 will compete with FLIP-309 > >> while > >> > > > setting > >> > > > > > the > >> > > > > > > > > > value of > >> > > > > > > > > > > >>> the backlog. Understood. Thanks for the hint. > >> > > > > > > > > > > >>> > >> > > > > > > > > > > >>> Best regards, > >> > > > > > > > > > > >>> Jing > >> > > > > > > > > > > >>> > >> > > > > > > > > > > >>> On Wed, Sep 6, 2023 at 12:12 PM Xuannan Su < > >> > > > > > > > suxuanna...@gmail.com> > >> > > > > > > > > > > >> wrote: > >> > > > > > > > > > > >>> > >> > > > > > > > > > > >>>> Hi Jing, > >> > > > > > > > > > > >>>> > >> > > > > > > > > > > >>>> Thank you for the clarification. > >> > > > > > > > > > > >>>> > >> > > > > > > > > > > >>>> For the use case you mentioned, I believe we > can > >> > > utilize > >> > > > > the > >> > > > > > > > > > > >>>> HybridSource, as updated in FLIP-309[1], to > >> > determine > >> > > > the > >> > > > > > > > backlog > >> > > > > > > > > > > >>>> status. For example, if the user wants to > process > >> > data > >> > > > > > before > >> > > > > > > > > time T > >> > > > > > > > > > > >>>> in batch mode and after time T in stream mode, > >> they > >> > > can > >> > > > > set > >> > > > > > > the > >> > > > > > > > > > first > >> > > > > > > > > > > >>>> source of the HybridSource to read up to time T > >> and > >> > > the > >> > > > > last > >> > > > > > > > > source > >> > > > > > > > > > of > >> > > > > > > > > > > >>>> the HybridSource to read from time T. > >> > > > > > > > > > > >>>> > >> > > > > > > > > > > >>>> Best, > >> > > > > > > > > > > >>>> Xuannan > >> > > > > > > > > > > >>>> > >> > > > > > > > > > > >>>> [1] > >> > > > > > > > > > > >>>> > >> > > > > > > > > > > >> > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-309%3A+Support+using+larger+checkpointing+interval+when+source+is+processing+backlog > >> > > > > > > > > > > >>>> > >> > > > > > > > > > > >>>> > >> > > > > > > > > > > >>>> On Mon, Sep 4, 2023 at 10:36 PM Jing Ge > >> > > > > > > > > <j...@ververica.com.invalid > >> > > > > > > > > > > > >> > > > > > > > > > > >>>> wrote: > >> > > > > > > > > > > >>>>> > >> > > > > > > > > > > >>>>> Hi Xuannan, > >> > > > > > > > > > > >>>>> > >> > > > > > > > > > > >>>>> Thanks for the clarification. > >> > > > > > > > > > > >>>>> > >> > > > > > > > > > > >>>>> 3. Event time and process time are two > different > >> > > > things. > >> > > > > It > >> > > > > > > > might > >> > > > > > > > > > be > >> > > > > > > > > > > >>>> rarely > >> > > > > > > > > > > >>>>> used, but conceptually, users can process data > >> in > >> > the > >> > > > > past > >> > > > > > > > > within a > >> > > > > > > > > > > >>>>> specific time range in the streaming mode. All > >> data > >> > > > > before > >> > > > > > > that > >> > > > > > > > > > range > >> > > > > > > > > > > >>>> will > >> > > > > > > > > > > >>>>> be considered as backlog and needed to be > >> processed > >> > > in > >> > > > > the > >> > > > > > > > batch > >> > > > > > > > > > > >> mode, > >> > > > > > > > > > > >>>>> like, e.g. the Present Perfect Progressive > tense > >> > used > >> > > > in > >> > > > > > > > English > >> > > > > > > > > > > >>>> language. > >> > > > > > > > > > > >>>>> > >> > > > > > > > > > > >>>>> Best regards, > >> > > > > > > > > > > >>>>> Jing > >> > > > > > > > > > > >>>>> > >> > > > > > > > > > > >>>>> On Thu, Aug 31, 2023 at 4:45 AM Xuannan Su < > >> > > > > > > > > suxuanna...@gmail.com> > >> > > > > > > > > > > >>>> wrote: > >> > > > > > > > > > > >>>>> > >> > > > > > > > > > > >>>>>> Hi Jing, > >> > > > > > > > > > > >>>>>> > >> > > > > > > > > > > >>>>>> Thanks for the reply. > >> > > > > > > > > > > >>>>>> > >> > > > > > > > > > > >>>>>> 1. You are absolutely right that the > watermark > >> lag > >> > > > > > threshold > >> > > > > > > > > must > >> > > > > > > > > > > >> be > >> > > > > > > > > > > >>>>>> carefully set with a thorough understanding > of > >> > > > watermark > >> > > > > > > > > > > >> generation. > >> > > > > > > > > > > >>>> It is > >> > > > > > > > > > > >>>>>> crucial for users to take into account the > >> > > > > > WatermarkStrategy > >> > > > > > > > > when > >> > > > > > > > > > > >>>> setting > >> > > > > > > > > > > >>>>>> the watermark lag threshold. > >> > > > > > > > > > > >>>>>> > >> > > > > > > > > > > >>>>>> 2. Regarding pure processing-time based > stream > >> > > > > processing > >> > > > > > > > jobs, > >> > > > > > > > > > > >>>>>> alternative strategies will be implemented to > >> > > > determine > >> > > > > > > > whether > >> > > > > > > > > > the > >> > > > > > > > > > > >>>> job is > >> > > > > > > > > > > >>>>>> processing backlog data. I have outlined two > >> > > possible > >> > > > > > > > strategies > >> > > > > > > > > > > >> below: > >> > > > > > > > > > > >>>>>> > >> > > > > > > > > > > >>>>>> - Based on the source operator's state. For > >> > example, > >> > > > > when > >> > > > > > > > MySQL > >> > > > > > > > > > CDC > >> > > > > > > > > > > >>>> source > >> > > > > > > > > > > >>>>>> is reading snapshot, it can claim > >> isBacklog=true. > >> > > > > > > > > > > >>>>>> - Based on metrics. For example, when > >> > > > > busyTimeMsPerSecond > >> > > > > > > (or > >> > > > > > > > > > > >>>>>> backPressuredTimeMsPerSecond) > > >> > > > > user_specified_threshold, > >> > > > > > > then > >> > > > > > > > > > > >>>>>> isBacklog=true. > >> > > > > > > > > > > >>>>>> > >> > > > > > > > > > > >>>>>> As of the strategies proposed in this FLIP, > it > >> > rely > >> > > on > >> > > > > > > > generated > >> > > > > > > > > > > >>>>>> watermarks. Therefore, if a user intends for > >> the > >> > job > >> > > > to > >> > > > > > > detect > >> > > > > > > > > > > >> backlog > >> > > > > > > > > > > >>>>>> status based on watermark, it is necessary to > >> > > generate > >> > > > > the > >> > > > > > > > > > > >> watermark. > >> > > > > > > > > > > >>>>>> > >> > > > > > > > > > > >>>>>> 3. I'm afraid I'm not fully grasping your > >> > question. > >> > > > From > >> > > > > > my > >> > > > > > > > > > > >>>> understanding, > >> > > > > > > > > > > >>>>>> it should work in both cases. When event > times > >> are > >> > > > close > >> > > > > > to > >> > > > > > > > the > >> > > > > > > > > > > >>>> processing > >> > > > > > > > > > > >>>>>> time, resulting in watermarks close to the > >> > > processing > >> > > > > > time, > >> > > > > > > > the > >> > > > > > > > > > > >> job is > >> > > > > > > > > > > >>>> not > >> > > > > > > > > > > >>>>>> processing backlog data. On the other hand, > >> when > >> > > event > >> > > > > > times > >> > > > > > > > are > >> > > > > > > > > > > >> far > >> > > > > > > > > > > >>>> from > >> > > > > > > > > > > >>>>>> processing time, causing watermarks to also > be > >> > > > distant, > >> > > > > if > >> > > > > > > the > >> > > > > > > > > lag > >> > > > > > > > > > > >>>>>> surpasses the defined threshold, the job is > >> > > considered > >> > > > > > > > > processing > >> > > > > > > > > > > >>>> backlog > >> > > > > > > > > > > >>>>>> data. > >> > > > > > > > > > > >>>>>> > >> > > > > > > > > > > >>>>>> Best, > >> > > > > > > > > > > >>>>>> Xuannan > >> > > > > > > > > > > >>>>>> > >> > > > > > > > > > > >>>>>> > >> > > > > > > > > > > >>>>>>> On Aug 31, 2023, at 02:56, Jing Ge > >> > > > > > > > <j...@ververica.com.INVALID > >> > > > > > > > > > > >> > > > > > > > > > > >>>> wrote: > >> > > > > > > > > > > >>>>>>> > >> > > > > > > > > > > >>>>>>> Hi Xuannan, > >> > > > > > > > > > > >>>>>>> > >> > > > > > > > > > > >>>>>>> Thanks for the clarification. That is the > part > >> > > where > >> > > > I > >> > > > > am > >> > > > > > > > > trying > >> > > > > > > > > > > >> to > >> > > > > > > > > > > >>>>>>> understand your thoughts. I have some > >> follow-up > >> > > > > > questions: > >> > > > > > > > > > > >>>>>>> > >> > > > > > > > > > > >>>>>>> 1. It depends strongly on the > >> watermarkStrategy > >> > and > >> > > > how > >> > > > > > > > > > > >> customized > >> > > > > > > > > > > >>>>>>> watermark generation looks like. It mixes > >> > business > >> > > > > logic > >> > > > > > > with > >> > > > > > > > > > > >>>> technical > >> > > > > > > > > > > >>>>>>> implementation and technical data processing > >> > mode. > >> > > > The > >> > > > > > > value > >> > > > > > > > of > >> > > > > > > > > > > >> the > >> > > > > > > > > > > >>>>>>> watermark lag threshold must be set very > >> > carefully. > >> > > > If > >> > > > > > the > >> > > > > > > > > value > >> > > > > > > > > > > >> is > >> > > > > > > > > > > >>>> too > >> > > > > > > > > > > >>>>>>> small. any time, when the watermark > generation > >> > > logic > >> > > > is > >> > > > > > > > > > > >>>> changed(business > >> > > > > > > > > > > >>>>>>> logic changes lead to the threshold getting > >> > > > exceeded), > >> > > > > > the > >> > > > > > > > same > >> > > > > > > > > > > >> job > >> > > > > > > > > > > >>>> might > >> > > > > > > > > > > >>>>>>> be running surprisingly in backlog > processing > >> > mode, > >> > > > > i.e. > >> > > > > > a > >> > > > > > > > > > > >> butterfly > >> > > > > > > > > > > >>>>>>> effect. A comprehensive documentation is > >> required > >> > > to > >> > > > > > avoid > >> > > > > > > > any > >> > > > > > > > > > > >>>> confusion > >> > > > > > > > > > > >>>>>>> for the users. > >> > > > > > > > > > > >>>>>>> 2. Like Jark already mentioned, use cases > >> that do > >> > > not > >> > > > > > have > >> > > > > > > > > > > >>>> watermarks, > >> > > > > > > > > > > >>>>>>> like pure processing-time based stream > >> > > processing[1] > >> > > > > are > >> > > > > > > not > >> > > > > > > > > > > >>>> covered. It > >> > > > > > > > > > > >>>>>> is > >> > > > > > > > > > > >>>>>>> more or less a trade-off solution that does > >> not > >> > > > support > >> > > > > > > such > >> > > > > > > > > use > >> > > > > > > > > > > >>>> cases > >> > > > > > > > > > > >>>>>> and > >> > > > > > > > > > > >>>>>>> appropriate documentation is required. > Forcing > >> > them > >> > > > to > >> > > > > > > > > explicitly > >> > > > > > > > > > > >>>>>> generate > >> > > > > > > > > > > >>>>>>> watermarks that are never needed just > because > >> of > >> > > this > >> > > > > > does > >> > > > > > > > not > >> > > > > > > > > > > >> sound > >> > > > > > > > > > > >>>>>> like a > >> > > > > > > > > > > >>>>>>> proper solution. > >> > > > > > > > > > > >>>>>>> 3. If I am not mistaken, it only works for > use > >> > > cases > >> > > > > > where > >> > > > > > > > > event > >> > > > > > > > > > > >>>> times > >> > > > > > > > > > > >>>>>> are > >> > > > > > > > > > > >>>>>>> very close to the processing times, because > >> the > >> > > wall > >> > > > > > clock > >> > > > > > > is > >> > > > > > > > > > > >> used to > >> > > > > > > > > > > >>>>>>> calculate the watermark lag and the > watermark > >> is > >> > > > > > generated > >> > > > > > > > > based > >> > > > > > > > > > > >> on > >> > > > > > > > > > > >>>> the > >> > > > > > > > > > > >>>>>>> event time. > >> > > > > > > > > > > >>>>>>> > >> > > > > > > > > > > >>>>>>> Best regards, > >> > > > > > > > > > > >>>>>>> Jing > >> > > > > > > > > > > >>>>>>> > >> > > > > > > > > > > >>>>>>> [1] > >> > > > > > > > > > > >>>>>>> > >> > > > > > > > > > > >>>>>> > >> > > > > > > > > > > >>>> > >> > > > > > > > > > > >> > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > https://github.com/apache/flink/blob/2c50b4e956305426f478b726d4de4a640a16b810/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkStrategy.java#L236 > >> > > > > > > > > > > >>>>>>> > >> > > > > > > > > > > >>>>>>> On Wed, Aug 30, 2023 at 4:06 AM Xuannan Su < > >> > > > > > > > > > > >> suxuanna...@gmail.com> > >> > > > > > > > > > > >>>>>> wrote: > >> > > > > > > > > > > >>>>>>> > >> > > > > > > > > > > >>>>>>>> Hi Jing, > >> > > > > > > > > > > >>>>>>>> > >> > > > > > > > > > > >>>>>>>> Thank you for the suggestion. > >> > > > > > > > > > > >>>>>>>> > >> > > > > > > > > > > >>>>>>>> The definition of watermark lag is the same > >> as > >> > the > >> > > > > > > > > watermarkLag > >> > > > > > > > > > > >>>> metric > >> > > > > > > > > > > >>>>>> in > >> > > > > > > > > > > >>>>>>>> FLIP-33[1]. More specifically, the > watermark > >> lag > >> > > > > > > calculation > >> > > > > > > > > is > >> > > > > > > > > > > >>>>>> computed at > >> > > > > > > > > > > >>>>>>>> the time when a watermark is emitted > >> downstream > >> > in > >> > > > the > >> > > > > > > > > following > >> > > > > > > > > > > >>>> way: > >> > > > > > > > > > > >>>>>>>> watermarkLag = CurrentTime - Watermark. I > >> have > >> > > added > >> > > > > > this > >> > > > > > > > > > > >>>> description to > >> > > > > > > > > > > >>>>>>>> the FLIP. > >> > > > > > > > > > > >>>>>>>> > >> > > > > > > > > > > >>>>>>>> I hope this addresses your concern. > >> > > > > > > > > > > >>>>>>>> > >> > > > > > > > > > > >>>>>>>> Best, > >> > > > > > > > > > > >>>>>>>> Xuannan > >> > > > > > > > > > > >>>>>>>> > >> > > > > > > > > > > >>>>>>>> [1] > >> > > > > > > > > > > >>>>>>>> > >> > > > > > > > > > > >>>>>> > >> > > > > > > > > > > >>>> > >> > > > > > > > > > > >> > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics > >> > > > > > > > > > > >>>>>>>> > >> > > > > > > > > > > >>>>>>>> > >> > > > > > > > > > > >>>>>>>>> On Aug 28, 2023, at 01:04, Jing Ge > >> > > > > > > > > <j...@ververica.com.INVALID > >> > > > > > > > > > > >>> > >> > > > > > > > > > > >>>> wrote: > >> > > > > > > > > > > >>>>>>>>> > >> > > > > > > > > > > >>>>>>>>> Hi Xuannan, > >> > > > > > > > > > > >>>>>>>>> > >> > > > > > > > > > > >>>>>>>>> Thanks for the proposal. +1 for me. > >> > > > > > > > > > > >>>>>>>>> > >> > > > > > > > > > > >>>>>>>>> There is one tiny thing that I am not sure > >> if I > >> > > > > > > understand > >> > > > > > > > it > >> > > > > > > > > > > >>>>>> correctly. > >> > > > > > > > > > > >>>>>>>>> Since there will be many different > >> > > > > WatermarkStrategies > >> > > > > > > and > >> > > > > > > > > > > >>>> different > >> > > > > > > > > > > >>>>>>>>> WatermarkGenerators. Could you please > update > >> > the > >> > > > FLIP > >> > > > > > and > >> > > > > > > > add > >> > > > > > > > > > > >> the > >> > > > > > > > > > > >>>>>>>>> description of how the watermark lag is > >> > > calculated > >> > > > > > > exactly? > >> > > > > > > > > > > >> E.g. > >> > > > > > > > > > > >>>>>>>> Watermark > >> > > > > > > > > > > >>>>>>>>> lag = A - B with A is the timestamp of the > >> > > > watermark > >> > > > > > > > emitted > >> > > > > > > > > > > >> to the > >> > > > > > > > > > > >>>>>>>>> downstream and B is....(this is the part I > >> am > >> > not > >> > > > > > really > >> > > > > > > > sure > >> > > > > > > > > > > >> after > >> > > > > > > > > > > >>>>>>>> reading > >> > > > > > > > > > > >>>>>>>>> the FLIP). > >> > > > > > > > > > > >>>>>>>>> > >> > > > > > > > > > > >>>>>>>>> Best regards, > >> > > > > > > > > > > >>>>>>>>> Jing > >> > > > > > > > > > > >>>>>>>>> > >> > > > > > > > > > > >>>>>>>>> > >> > > > > > > > > > > >>>>>>>>> On Mon, Aug 21, 2023 at 9:03 AM Xuannan > Su < > >> > > > > > > > > > > >> suxuanna...@gmail.com> > >> > > > > > > > > > > >>>>>>>> wrote: > >> > > > > > > > > > > >>>>>>>>> > >> > > > > > > > > > > >>>>>>>>>> Hi Jark, > >> > > > > > > > > > > >>>>>>>>>> > >> > > > > > > > > > > >>>>>>>>>> Thanks for the comments. > >> > > > > > > > > > > >>>>>>>>>> > >> > > > > > > > > > > >>>>>>>>>> I agree that the current solution cannot > >> > support > >> > > > > jobs > >> > > > > > > that > >> > > > > > > > > > > >> cannot > >> > > > > > > > > > > >>>>>> define > >> > > > > > > > > > > >>>>>>>>>> watermarks. However, after considering > the > >> > > > > > > > > > > >> pending-record-based > >> > > > > > > > > > > >>>>>>>> solution, I > >> > > > > > > > > > > >>>>>>>>>> believe the current solution is superior > >> for > >> > the > >> > > > > > target > >> > > > > > > > use > >> > > > > > > > > > > >> case > >> > > > > > > > > > > >>>> as it > >> > > > > > > > > > > >>>>>>>> is > >> > > > > > > > > > > >>>>>>>>>> more intuitive for users. The backlog > >> status > >> > > gives > >> > > > > > users > >> > > > > > > > the > >> > > > > > > > > > > >>>> ability > >> > > > > > > > > > > >>>>>> to > >> > > > > > > > > > > >>>>>>>>>> balance between throughput and latency. > >> Making > >> > > > this > >> > > > > > > > > trade-off > >> > > > > > > > > > > >>>> decision > >> > > > > > > > > > > >>>>>>>>>> based on the watermark lag is more > >> intuitive > >> > > from > >> > > > > the > >> > > > > > > > user's > >> > > > > > > > > > > >>>>>>>> perspective. > >> > > > > > > > > > > >>>>>>>>>> For instance, a user can decide that if > the > >> > job > >> > > > lags > >> > > > > > > > behind > >> > > > > > > > > > > >> the > >> > > > > > > > > > > >>>>>> current > >> > > > > > > > > > > >>>>>>>>>> time by more than 1 hour, the result is > not > >> > > > usable. > >> > > > > In > >> > > > > > > > that > >> > > > > > > > > > > >> case, > >> > > > > > > > > > > >>>> we > >> > > > > > > > > > > >>>>>> can > >> > > > > > > > > > > >>>>>>>>>> optimize for throughput when the data > lags > >> > > behind > >> > > > by > >> > > > > > > more > >> > > > > > > > > > > >> than an > >> > > > > > > > > > > >>>>>> hour. > >> > > > > > > > > > > >>>>>>>>>> With the pending-record-based solution, > >> it's > >> > > > > > challenging > >> > > > > > > > for > >> > > > > > > > > > > >>>> users to > >> > > > > > > > > > > >>>>>>>>>> determine when to optimize for throughput > >> and > >> > > when > >> > > > > to > >> > > > > > > > > > > >> prioritize > >> > > > > > > > > > > >>>>>>>> latency. > >> > > > > > > > > > > >>>>>>>>>> > >> > > > > > > > > > > >>>>>>>>>> Regarding the limitations of the > >> > watermark-based > >> > > > > > > solution: > >> > > > > > > > > > > >>>>>>>>>> > >> > > > > > > > > > > >>>>>>>>>> 1. The current solution can support jobs > >> with > >> > > > > sources > >> > > > > > > that > >> > > > > > > > > > > >> have > >> > > > > > > > > > > >>>> event > >> > > > > > > > > > > >>>>>>>>>> time. Users can always define a watermark > >> at > >> > the > >> > > > > > source > >> > > > > > > > > > > >> operator, > >> > > > > > > > > > > >>>> even > >> > > > > > > > > > > >>>>>>>> if > >> > > > > > > > > > > >>>>>>>>>> it's not used by downstream operators, > >> such as > >> > > > > > streaming > >> > > > > > > > > join > >> > > > > > > > > > > >> and > >> > > > > > > > > > > >>>>>>>> unbounded > >> > > > > > > > > > > >>>>>>>>>> aggregate. > >> > > > > > > > > > > >>>>>>>>>> > >> > > > > > > > > > > >>>>>>>>>> 2.I don't believe it's accurate to say > that > >> > the > >> > > > > > > watermark > >> > > > > > > > > lag > >> > > > > > > > > > > >> will > >> > > > > > > > > > > >>>>>> keep > >> > > > > > > > > > > >>>>>>>>>> increasing if no data is generated in > >> Kafka. > >> > The > >> > > > > > > watermark > >> > > > > > > > > > > >> lag and > >> > > > > > > > > > > >>>>>>>> backlog > >> > > > > > > > > > > >>>>>>>>>> status are determined at the moment when > >> the > >> > > > > watermark > >> > > > > > > is > >> > > > > > > > > > > >> emitted > >> > > > > > > > > > > >>>> to > >> > > > > > > > > > > >>>>>> the > >> > > > > > > > > > > >>>>>>>>>> downstream operator. If no data is > emitted > >> > from > >> > > > the > >> > > > > > > > source, > >> > > > > > > > > > > >> the > >> > > > > > > > > > > >>>>>>>> watermark > >> > > > > > > > > > > >>>>>>>>>> lag and backlog status will not be > >> updated. If > >> > > the > >> > > > > > > > > > > >>>> WatermarkStrategy > >> > > > > > > > > > > >>>>>>>> with > >> > > > > > > > > > > >>>>>>>>>> idleness is used, the source becomes > >> > non-backlog > >> > > > > when > >> > > > > > it > >> > > > > > > > > > > >> becomes > >> > > > > > > > > > > >>>> idle. > >> > > > > > > > > > > >>>>>>>>>> > >> > > > > > > > > > > >>>>>>>>>> 3. I think watermark lag is more > intuitive > >> to > >> > > > > > determine > >> > > > > > > > if a > >> > > > > > > > > > > >> job > >> > > > > > > > > > > >>>> is > >> > > > > > > > > > > >>>>>>>>>> processing backlog data. Even when using > >> > pending > >> > > > > > > records, > >> > > > > > > > it > >> > > > > > > > > > > >>>> faces a > >> > > > > > > > > > > >>>>>>>>>> similar issue. For example, if the source > >> has > >> > 1K > >> > > > > > pending > >> > > > > > > > > > > >> records, > >> > > > > > > > > > > >>>>>> those > >> > > > > > > > > > > >>>>>>>>>> records can span from 1 day to 1 hour > to 1 > >> > > > second. > >> > > > > If > >> > > > > > > the > >> > > > > > > > > > > >> records > >> > > > > > > > > > > >>>>>> span > >> > > > > > > > > > > >>>>>>>> 1 > >> > > > > > > > > > > >>>>>>>>>> day, it's probably best to optimize for > >> > > > throughput. > >> > > > > If > >> > > > > > > > they > >> > > > > > > > > > > >> span 1 > >> > > > > > > > > > > >>>>>>>> hour, it > >> > > > > > > > > > > >>>>>>>>>> depends on the business logic. If they > >> span 1 > >> > > > > second, > >> > > > > > > > > > > >> optimizing > >> > > > > > > > > > > >>>> for > >> > > > > > > > > > > >>>>>>>>>> latency is likely the better choice. > >> > > > > > > > > > > >>>>>>>>>> > >> > > > > > > > > > > >>>>>>>>>> In summary, I believe the watermark-based > >> > > solution > >> > > > > is > >> > > > > > a > >> > > > > > > > > > > >> superior > >> > > > > > > > > > > >>>>>> choice > >> > > > > > > > > > > >>>>>>>>>> for the target use case where > >> watermark/event > >> > > time > >> > > > > can > >> > > > > > > be > >> > > > > > > > > > > >> defined. > >> > > > > > > > > > > >>>>>>>>>> Additionally, I haven't come across a > >> scenario > >> > > > that > >> > > > > > > > requires > >> > > > > > > > > > > >>>>>> low-latency > >> > > > > > > > > > > >>>>>>>>>> processing and reads from a source that > >> cannot > >> > > > > define > >> > > > > > > > > > > >> watermarks. > >> > > > > > > > > > > >>>> If > >> > > > > > > > > > > >>>>>> we > >> > > > > > > > > > > >>>>>>>>>> encounter such a use case, we can create > >> > another > >> > > > > FLIP > >> > > > > > to > >> > > > > > > > > > > >> address > >> > > > > > > > > > > >>>> those > >> > > > > > > > > > > >>>>>>>>>> needs in the future. What do you think? > >> > > > > > > > > > > >>>>>>>>>> > >> > > > > > > > > > > >>>>>>>>>> > >> > > > > > > > > > > >>>>>>>>>> Best, > >> > > > > > > > > > > >>>>>>>>>> Xuannan > >> > > > > > > > > > > >>>>>>>>>> > >> > > > > > > > > > > >>>>>>>>>> > >> > > > > > > > > > > >>>>>>>>>> > >> > > > > > > > > > > >>>>>>>>>>> On Aug 20, 2023, at 23:27, Jark Wu < > >> > > > > imj...@gmail.com > >> > > > > > > > > > > >> <mailto: > >> > > > > > > > > > > >>>>>>>>>> imj...@gmail.com>> wrote: > >> > > > > > > > > > > >>>>>>>>>>> > >> > > > > > > > > > > >>>>>>>>>>> Hi Xuannan, > >> > > > > > > > > > > >>>>>>>>>>> > >> > > > > > > > > > > >>>>>>>>>>> Thanks for opening this discussion. > >> > > > > > > > > > > >>>>>>>>>>> > >> > > > > > > > > > > >>>>>>>>>>> This current proposal may work in the > >> > mentioned > >> > > > > > > watermark > >> > > > > > > > > > > >> cases. > >> > > > > > > > > > > >>>>>>>>>>> However, it seems this is not a general > >> > > solution > >> > > > > for > >> > > > > > > > > sources > >> > > > > > > > > > > >> to > >> > > > > > > > > > > >>>>>>>> determine > >> > > > > > > > > > > >>>>>>>>>>> "isProcessingBacklog". > >> > > > > > > > > > > >>>>>>>>>>> From my point of view, there are 3 > >> > limitations > >> > > of > >> > > > > the > >> > > > > > > > > current > >> > > > > > > > > > > >>>>>> proposal: > >> > > > > > > > > > > >>>>>>>>>>> 1. It doesn't cover jobs that don't have > >> > > > > > > > > watermark/event-time > >> > > > > > > > > > > >>>>>> defined, > >> > > > > > > > > > > >>>>>>>>>>> for example streaming join and unbounded > >> > > > aggregate. > >> > > > > > We > >> > > > > > > > may > >> > > > > > > > > > > >> still > >> > > > > > > > > > > >>>> need > >> > > > > > > > > > > >>>>>>>> to > >> > > > > > > > > > > >>>>>>>>>>> figure out solutions for them. > >> > > > > > > > > > > >>>>>>>>>>> 2. Watermark lag can not be trusted, > >> because > >> > it > >> > > > > > > increases > >> > > > > > > > > > > >>>> unlimited > >> > > > > > > > > > > >>>>>> if > >> > > > > > > > > > > >>>>>>>> no > >> > > > > > > > > > > >>>>>>>>>>> data is generated in the Kafka. > >> > > > > > > > > > > >>>>>>>>>>> But in this case, there is no backlog at > >> all. > >> > > > > > > > > > > >>>>>>>>>>> 3. Watermark lag is hard to reflect the > >> > amount > >> > > of > >> > > > > > > > backlog. > >> > > > > > > > > > > >> If the > >> > > > > > > > > > > >>>>>>>>>> watermark > >> > > > > > > > > > > >>>>>>>>>>> lag is 1day or 1 hour or 1second, > >> > > > > > > > > > > >>>>>>>>>>> there is possibly only 1 pending record > >> > there, > >> > > > > which > >> > > > > > > > means > >> > > > > > > > > no > >> > > > > > > > > > > >>>> backlog > >> > > > > > > > > > > >>>>>>>> at > >> > > > > > > > > > > >>>>>>>>>>> all. > >> > > > > > > > > > > >>>>>>>>>>> > >> > > > > > > > > > > >>>>>>>>>>> Therefore, IMO, watermark maybe not the > >> ideal > >> > > > > metric > >> > > > > > > used > >> > > > > > > > > to > >> > > > > > > > > > > >>>>>> determine > >> > > > > > > > > > > >>>>>>>>>>> "isProcessingBacklog". > >> > > > > > > > > > > >>>>>>>>>>> What we need is something that reflects > >> the > >> > > > number > >> > > > > of > >> > > > > > > > > records > >> > > > > > > > > > > >>>>>>>> unprocessed > >> > > > > > > > > > > >>>>>>>>>>> by the job. > >> > > > > > > > > > > >>>>>>>>>>> Actually, that is the "pendingRecords" > >> metric > >> > > > > > proposed > >> > > > > > > in > >> > > > > > > > > > > >>>> FLIP-33 and > >> > > > > > > > > > > >>>>>>>> has > >> > > > > > > > > > > >>>>>>>>>>> been implemented by Kafka source. > >> > > > > > > > > > > >>>>>>>>>>> Did you consider using "pendingRecords" > >> > metric > >> > > to > >> > > > > > > > determine > >> > > > > > > > > > > >>>>>>>>>>> "isProcessingBacklog"? > >> > > > > > > > > > > >>>>>>>>>>> > >> > > > > > > > > > > >>>>>>>>>>> Best, > >> > > > > > > > > > > >>>>>>>>>>> Jark > >> > > > > > > > > > > >>>>>>>>>>> > >> > > > > > > > > > > >>>>>>>>>>> > >> > > > > > > > > > > >>>>>>>>>>> [1] > >> > > > > > > > > > > >>>>>>>>>>> > >> > > > > > > > > > > >>>>>>>>>> > >> > > > > > > > > > > >>>>>>>> > >> > > > > > > > > > > >>>>>> > >> > > > > > > > > > > >>>> > >> > > > > > > > > > > >> > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics > >> > > > > > > > > > > >>>>>>>>>> < > >> > > > > > > > > > > >>>>>>>>>> > >> > > > > > > > > > > >>>>>>>> > >> > > > > > > > > > > >>>>>> > >> > > > > > > > > > > >>>> > >> > > > > > > > > > > >> > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics > >> > > > > > > > > > > >>>>>>>>>>> > >> > > > > > > > > > > >>>>>>>>>>> > >> > > > > > > > > > > >>>>>>>>>>> > >> > > > > > > > > > > >>>>>>>>>>> > >> > > > > > > > > > > >>>>>>>>>>> On Tue, 15 Aug 2023 at 12:04, Xintong > >> Song < > >> > > > > > > > > > > >>>> tonysong...@gmail.com > >> > > > > > > > > > > >>>>>>>>>> <mailto:tonysong...@gmail.com>> wrote: > >> > > > > > > > > > > >>>>>>>>>>> > >> > > > > > > > > > > >>>>>>>>>>>> Sounds good to me. > >> > > > > > > > > > > >>>>>>>>>>>> > >> > > > > > > > > > > >>>>>>>>>>>> It is true that, if we are introducing > >> the > >> > > > > > generalized > >> > > > > > > > > > > >>>> watermark, > >> > > > > > > > > > > >>>>>>>> there > >> > > > > > > > > > > >>>>>>>>>>>> will be other watermark related > concepts > >> / > >> > > > > > > > configurations > >> > > > > > > > > > > >> that > >> > > > > > > > > > > >>>> need > >> > > > > > > > > > > >>>>>> to > >> > > > > > > > > > > >>>>>>>>>> be > >> > > > > > > > > > > >>>>>>>>>>>> updated anyway. > >> > > > > > > > > > > >>>>>>>>>>>> > >> > > > > > > > > > > >>>>>>>>>>>> > >> > > > > > > > > > > >>>>>>>>>>>> Best, > >> > > > > > > > > > > >>>>>>>>>>>> > >> > > > > > > > > > > >>>>>>>>>>>> Xintong > >> > > > > > > > > > > >>>>>>>>>>>> > >> > > > > > > > > > > >>>>>>>>>>>> > >> > > > > > > > > > > >>>>>>>>>>>> > >> > > > > > > > > > > >>>>>>>>>>>> On Tue, Aug 15, 2023 at 11:30 AM > Xuannan > >> Su > >> > < > >> > > > > > > > > > > >>>> suxuanna...@gmail.com > >> > > > > > > > > > > >>>>>>>>>> <mailto:suxuanna...@gmail.com>> wrote: > >> > > > > > > > > > > >>>>>>>>>>>> > >> > > > > > > > > > > >>>>>>>>>>>>> Hi Xingtong, > >> > > > > > > > > > > >>>>>>>>>>>>> > >> > > > > > > > > > > >>>>>>>>>>>>> Thank you for your suggestion. > >> > > > > > > > > > > >>>>>>>>>>>>> > >> > > > > > > > > > > >>>>>>>>>>>>> After considering the idea of using a > >> > general > >> > > > > > > > > configuration > >> > > > > > > > > > > >>>> key, I > >> > > > > > > > > > > >>>>>>>>>> think > >> > > > > > > > > > > >>>>>>>>>>>>> it may not be a good idea for the > >> reasons > >> > > > below. > >> > > > > > > > > > > >>>>>>>>>>>>> > >> > > > > > > > > > > >>>>>>>>>>>>> While I agree that using a more > general > >> > > > > > configuration > >> > > > > > > > key > >> > > > > > > > > > > >>>> provides > >> > > > > > > > > > > >>>>>> us > >> > > > > > > > > > > >>>>>>>>>>>> with > >> > > > > > > > > > > >>>>>>>>>>>>> the flexibility to switch to other > >> > approaches > >> > > > to > >> > > > > > > > > calculate > >> > > > > > > > > > > >> the > >> > > > > > > > > > > >>>> lag > >> > > > > > > > > > > >>>>>> in > >> > > > > > > > > > > >>>>>>>>>> the > >> > > > > > > > > > > >>>>>>>>>>>>> future, the downside is that it may > >> cause > >> > > > > confusion > >> > > > > > > for > >> > > > > > > > > > > >> users. > >> > > > > > > > > > > >>>> We > >> > > > > > > > > > > >>>>>>>>>>>> currently > >> > > > > > > > > > > >>>>>>>>>>>>> have fetchEventTimeLag, > >> emitEventTimeLag, > >> > and > >> > > > > > > > > watermarkLag > >> > > > > > > > > > > >> in > >> > > > > > > > > > > >>>> the > >> > > > > > > > > > > >>>>>>>>>> source, > >> > > > > > > > > > > >>>>>>>>>>>>> and it is not clear which specific lag > >> we > >> > are > >> > > > > > > referring > >> > > > > > > > > to. > >> > > > > > > > > > > >>>> With > >> > > > > > > > > > > >>>>>> the > >> > > > > > > > > > > >>>>>>>>>>>>> potential introduction of the > >> Generalized > >> > > > > Watermark > >> > > > > > > > > > > >> mechanism > >> > > > > > > > > > > >>>> in > >> > > > > > > > > > > >>>>>> the > >> > > > > > > > > > > >>>>>>>>>>>>> future, if I understand correctly, a > >> > > watermark > >> > > > > > won't > >> > > > > > > > > > > >>>> necessarily > >> > > > > > > > > > > >>>>>> need > >> > > > > > > > > > > >>>>>>>>>> to > >> > > > > > > > > > > >>>>>>>>>>>> be > >> > > > > > > > > > > >>>>>>>>>>>>> a timestamp. I am concern that the > >> general > >> > > > > > > > configuration > >> > > > > > > > > > > >> key > >> > > > > > > > > > > >>>> may > >> > > > > > > > > > > >>>>>> not > >> > > > > > > > > > > >>>>>>>>>> be > >> > > > > > > > > > > >>>>>>>>>>>>> enough to cover all the use case and > we > >> > will > >> > > > need > >> > > > > > to > >> > > > > > > > > > > >> introduce > >> > > > > > > > > > > >>>> a > >> > > > > > > > > > > >>>>>>>>>> general > >> > > > > > > > > > > >>>>>>>>>>>>> way to determine the backlog status > >> > > regardless. > >> > > > > > > > > > > >>>>>>>>>>>>> > >> > > > > > > > > > > >>>>>>>>>>>>> For the reasons above, I prefer > >> introducing > >> > > the > >> > > > > > > > > > > >> configuration > >> > > > > > > > > > > >>>> as > >> > > > > > > > > > > >>>>>> is, > >> > > > > > > > > > > >>>>>>>>>> and > >> > > > > > > > > > > >>>>>>>>>>>>> change it later with the a deprecation > >> > > process > >> > > > or > >> > > > > > > > > migration > >> > > > > > > > > > > >>>>>> process. > >> > > > > > > > > > > >>>>>>>>>> What > >> > > > > > > > > > > >>>>>>>>>>>>> do you think? > >> > > > > > > > > > > >>>>>>>>>>>>> > >> > > > > > > > > > > >>>>>>>>>>>>> Best, > >> > > > > > > > > > > >>>>>>>>>>>>> Xuannan > >> > > > > > > > > > > >>>>>>>>>>>>> On Aug 14, 2023, 14:09 +0800, Xintong > >> Song > >> > < > >> > > > > > > > > > > >>>> tonysong...@gmail.com > >> > > > > > > > > > > >>>>>>>>>> <mailto:tonysong...@gmail.com>>, > >> > > > > > > > > > > >>>>>>>>>>>> wrote: > >> > > > > > > > > > > >>>>>>>>>>>>>> Thanks for the explanation. > >> > > > > > > > > > > >>>>>>>>>>>>>> > >> > > > > > > > > > > >>>>>>>>>>>>>> I wonder if it makes sense to not > >> expose > >> > > this > >> > > > > > detail > >> > > > > > > > via > >> > > > > > > > > > > >> the > >> > > > > > > > > > > >>>>>>>>>>>>> configuration > >> > > > > > > > > > > >>>>>>>>>>>>>> option. To be specific, I suggest not > >> > > > mentioning > >> > > > > > the > >> > > > > > > > > > > >>>> "watermark" > >> > > > > > > > > > > >>>>>>>>>>>> keyword > >> > > > > > > > > > > >>>>>>>>>>>>> in > >> > > > > > > > > > > >>>>>>>>>>>>>> the configuration key and > description. > >> > > > > > > > > > > >>>>>>>>>>>>>> > >> > > > > > > > > > > >>>>>>>>>>>>>> - From the users' perspective, I > think > >> > they > >> > > > only > >> > > > > > > need > >> > > > > > > > to > >> > > > > > > > > > > >> know > >> > > > > > > > > > > >>>>>>>> there's > >> > > > > > > > > > > >>>>>>>>>> a > >> > > > > > > > > > > >>>>>>>>>>>>>> lag higher than the given threshold, > >> Flink > >> > > > will > >> > > > > > > > consider > >> > > > > > > > > > > >>>> latency > >> > > > > > > > > > > >>>>>> of > >> > > > > > > > > > > >>>>>>>>>>>>>> individual records as less important > >> and > >> > > > > > prioritize > >> > > > > > > > > > > >> throughput > >> > > > > > > > > > > >>>>>> over > >> > > > > > > > > > > >>>>>>>>>> it. > >> > > > > > > > > > > >>>>>>>>>>>>>> They don't really need the details of > >> how > >> > > the > >> > > > > lags > >> > > > > > > are > >> > > > > > > > > > > >>>> calculated. > >> > > > > > > > > > > >>>>>>>>>>>>>> - For the internal implementation, I > >> also > >> > > > think > >> > > > > > > using > >> > > > > > > > > > > >>>> watermark > >> > > > > > > > > > > >>>>>> lags > >> > > > > > > > > > > >>>>>>>>>> is > >> > > > > > > > > > > >>>>>>>>>>>>>> a good idea, for the reasons you've > >> > already > >> > > > > > > mentioned. > >> > > > > > > > > > > >>>> However, > >> > > > > > > > > > > >>>>>> it's > >> > > > > > > > > > > >>>>>>>>>>>> not > >> > > > > > > > > > > >>>>>>>>>>>>>> the only possible option. Hiding this > >> > detail > >> > > > > from > >> > > > > > > > users > >> > > > > > > > > > > >> would > >> > > > > > > > > > > >>>> give > >> > > > > > > > > > > >>>>>>>> us > >> > > > > > > > > > > >>>>>>>>>>>> the > >> > > > > > > > > > > >>>>>>>>>>>>>> flexibility to switch to other > >> approaches > >> > if > >> > > > > > needed > >> > > > > > > in > >> > > > > > > > > > > >> future. > >> > > > > > > > > > > >>>>>>>>>>>>>> - We are currently working on > designing > >> > the > >> > > > > > > > > > > >> ProcessFunction > >> > > > > > > > > > > >>>> API > >> > > > > > > > > > > >>>>>>>>>>>>>> (consider it as a DataStream API V2). > >> > > There's > >> > > > an > >> > > > > > > idea > >> > > > > > > > to > >> > > > > > > > > > > >>>>>> introduce a > >> > > > > > > > > > > >>>>>>>>>>>>>> Generalized Watermark mechanism, > where > >> > > > basically > >> > > > > > the > >> > > > > > > > > > > >>>> watermark can > >> > > > > > > > > > > >>>>>>>> be > >> > > > > > > > > > > >>>>>>>>>>>>>> anything that needs to travel along > the > >> > > > > data-flow > >> > > > > > > with > >> > > > > > > > > > > >> certain > >> > > > > > > > > > > >>>>>>>>>>>> alignment > >> > > > > > > > > > > >>>>>>>>>>>>>> strategies, and event time watermark > >> would > >> > > be > >> > > > > one > >> > > > > > > > > specific > >> > > > > > > > > > > >>>> case of > >> > > > > > > > > > > >>>>>>>> it. > >> > > > > > > > > > > >>>>>>>>>>>>> This > >> > > > > > > > > > > >>>>>>>>>>>>>> is still an idea and has not been > >> > discussed > >> > > > and > >> > > > > > > agreed > >> > > > > > > > > on > >> > > > > > > > > > > >> by > >> > > > > > > > > > > >>>> the > >> > > > > > > > > > > >>>>>>>>>>>>> community, > >> > > > > > > > > > > >>>>>>>>>>>>>> and we are preparing a FLIP for it. > >> But if > >> > > we > >> > > > > are > >> > > > > > > > going > >> > > > > > > > > > > >> for > >> > > > > > > > > > > >>>> it, > >> > > > > > > > > > > >>>>>> the > >> > > > > > > > > > > >>>>>>>>>>>>> concept > >> > > > > > > > > > > >>>>>>>>>>>>>> "watermark-lag-threshold" could be > >> > > ambiguous. > >> > > > > > > > > > > >>>>>>>>>>>>>> > >> > > > > > > > > > > >>>>>>>>>>>>>> I do not intend to block the FLIP on > >> this. > >> > > I'd > >> > > > > > also > >> > > > > > > be > >> > > > > > > > > > > >> fine > >> > > > > > > > > > > >>>> with > >> > > > > > > > > > > >>>>>>>>>>>>>> introducing the configuration as is, > >> and > >> > > > > changing > >> > > > > > it > >> > > > > > > > > > > >> later, if > >> > > > > > > > > > > >>>>>>>> needed, > >> > > > > > > > > > > >>>>>>>>>>>>> with > >> > > > > > > > > > > >>>>>>>>>>>>>> a regular deprecation and migration > >> > process. > >> > > > > Just > >> > > > > > > > making > >> > > > > > > > > > > >> my > >> > > > > > > > > > > >>>>>>>>>>>> suggestions. > >> > > > > > > > > > > >>>>>>>>>>>>>> > >> > > > > > > > > > > >>>>>>>>>>>>>> > >> > > > > > > > > > > >>>>>>>>>>>>>> Best, > >> > > > > > > > > > > >>>>>>>>>>>>>> > >> > > > > > > > > > > >>>>>>>>>>>>>> Xintong > >> > > > > > > > > > > >>>>>>>>>>>>>> > >> > > > > > > > > > > >>>>>>>>>>>>>> > >> > > > > > > > > > > >>>>>>>>>>>>>> > >> > > > > > > > > > > >>>>>>>>>>>>>> On Mon, Aug 14, 2023 at 12:00 PM > >> Xuannan > >> > Su > >> > > < > >> > > > > > > > > > > >>>>>> suxuanna...@gmail.com > >> > > > > > > > > > > >>>>>>>>>> <mailto:suxuanna...@gmail.com>> > >> > > > > > > > > > > >>>>>>>>>>>>> wrote: > >> > > > > > > > > > > >>>>>>>>>>>>>> > >> > > > > > > > > > > >>>>>>>>>>>>>>> Hi Xintong, > >> > > > > > > > > > > >>>>>>>>>>>>>>> > >> > > > > > > > > > > >>>>>>>>>>>>>>> Thanks for the reply. > >> > > > > > > > > > > >>>>>>>>>>>>>>> > >> > > > > > > > > > > >>>>>>>>>>>>>>> I have considered using the > timestamp > >> in > >> > > the > >> > > > > > > records > >> > > > > > > > to > >> > > > > > > > > > > >>>> determine > >> > > > > > > > > > > >>>>>>>> the > >> > > > > > > > > > > >>>>>>>>>>>>>>> backlog status, and decided to use > >> > > watermark > >> > > > at > >> > > > > > the > >> > > > > > > > > end. > >> > > > > > > > > > > >> By > >> > > > > > > > > > > >>>>>>>>>>>> definition, > >> > > > > > > > > > > >>>>>>>>>>>>>>> watermark is the time progress > >> indication > >> > > in > >> > > > > the > >> > > > > > > data > >> > > > > > > > > > > >>>> stream. It > >> > > > > > > > > > > >>>>>>>>>>>>> indicates > >> > > > > > > > > > > >>>>>>>>>>>>>>> the stream’s event time has > >> progressed to > >> > > > some > >> > > > > > > > specific > >> > > > > > > > > > > >>>> time. On > >> > > > > > > > > > > >>>>>>>> the > >> > > > > > > > > > > >>>>>>>>>>>>> other > >> > > > > > > > > > > >>>>>>>>>>>>>>> hand, timestamp in the records is > >> usually > >> > > > used > >> > > > > to > >> > > > > > > > > > > >> generate > >> > > > > > > > > > > >>>> the > >> > > > > > > > > > > >>>>>>>>>>>>> watermark. > >> > > > > > > > > > > >>>>>>>>>>>>>>> Therefore, it appears more > appropriate > >> > and > >> > > > > > > intuitive > >> > > > > > > > to > >> > > > > > > > > > > >>>> calculate > >> > > > > > > > > > > >>>>>>>> the > >> > > > > > > > > > > >>>>>>>>>>>>> event > >> > > > > > > > > > > >>>>>>>>>>>>>>> time lag by watermark and determine > >> the > >> > > > backlog > >> > > > > > > > status. > >> > > > > > > > > > > >> And > >> > > > > > > > > > > >>>> by > >> > > > > > > > > > > >>>>>>>> using > >> > > > > > > > > > > >>>>>>>>>>>>> the > >> > > > > > > > > > > >>>>>>>>>>>>>>> watermark, we can easily deal with > the > >> > > > > > out-of-order > >> > > > > > > > and > >> > > > > > > > > > > >> the > >> > > > > > > > > > > >>>>>>>> idleness > >> > > > > > > > > > > >>>>>>>>>>>>> of the > >> > > > > > > > > > > >>>>>>>>>>>>>>> data. > >> > > > > > > > > > > >>>>>>>>>>>>>>> > >> > > > > > > > > > > >>>>>>>>>>>>>>> Please let me know if you have > further > >> > > > > questions. > >> > > > > > > > > > > >>>>>>>>>>>>>>> > >> > > > > > > > > > > >>>>>>>>>>>>>>> Best, > >> > > > > > > > > > > >>>>>>>>>>>>>>> Xuannan > >> > > > > > > > > > > >>>>>>>>>>>>>>> On Aug 10, 2023, 20:23 +0800, > Xintong > >> > Song > >> > > < > >> > > > > > > > > > > >>>>>> tonysong...@gmail.com > >> > > > > > > > > > > >>>>>>>>>> <mailto:tonysong...@gmail.com>>, > >> > > > > > > > > > > >>>>>>>>>>>>> wrote: > >> > > > > > > > > > > >>>>>>>>>>>>>>>> Thanks for preparing the FLIP, > >> Xuannan. > >> > > > > > > > > > > >>>>>>>>>>>>>>>> > >> > > > > > > > > > > >>>>>>>>>>>>>>>> +1 in general. > >> > > > > > > > > > > >>>>>>>>>>>>>>>> > >> > > > > > > > > > > >>>>>>>>>>>>>>>> A quick question, could you explain > >> why > >> > we > >> > > > are > >> > > > > > > > relying > >> > > > > > > > > > > >> on > >> > > > > > > > > > > >>>> the > >> > > > > > > > > > > >>>>>>>>>>>>> watermark > >> > > > > > > > > > > >>>>>>>>>>>>>>> for > >> > > > > > > > > > > >>>>>>>>>>>>>>>> emitting the record attribute? Why > >> not > >> > use > >> > > > > > > > timestamps > >> > > > > > > > > > > >> in the > >> > > > > > > > > > > >>>>>>>>>>>>> records? I > >> > > > > > > > > > > >>>>>>>>>>>>>>>> don't see any concern in using > >> > watermarks. > >> > > > > Just > >> > > > > > > > > > > >> wondering if > >> > > > > > > > > > > >>>>>>>>>>>> there's > >> > > > > > > > > > > >>>>>>>>>>>>> any > >> > > > > > > > > > > >>>>>>>>>>>>>>>> deep considerations behind this. > >> > > > > > > > > > > >>>>>>>>>>>>>>>> > >> > > > > > > > > > > >>>>>>>>>>>>>>>> Best, > >> > > > > > > > > > > >>>>>>>>>>>>>>>> > >> > > > > > > > > > > >>>>>>>>>>>>>>>> Xintong > >> > > > > > > > > > > >>>>>>>>>>>>>>>> > >> > > > > > > > > > > >>>>>>>>>>>>>>>> > >> > > > > > > > > > > >>>>>>>>>>>>>>>> > >> > > > > > > > > > > >>>>>>>>>>>>>>>> On Thu, Aug 3, 2023 at 3:03 PM > >> Xuannan > >> > Su > >> > > < > >> > > > > > > > > > > >>>>>> suxuanna...@gmail.com > >> > > > > > > > > > > >>>>>>>>>> <mailto:suxuanna...@gmail.com>> > >> > > > > > > > > > > >>>>>>>>>>>>> wrote: > >> > > > > > > > > > > >>>>>>>>>>>>>>>> > >> > > > > > > > > > > >>>>>>>>>>>>>>>>> Hi all, > >> > > > > > > > > > > >>>>>>>>>>>>>>>>> > >> > > > > > > > > > > >>>>>>>>>>>>>>>>> I am opening this thread to > discuss > >> > > > FLIP-328: > >> > > > > > > Allow > >> > > > > > > > > > > >> source > >> > > > > > > > > > > >>>>>>>>>>>>> operators to > >> > > > > > > > > > > >>>>>>>>>>>>>>>>> determine isProcessingBacklog > based > >> on > >> > > > > > watermark > >> > > > > > > > > > > >> lag[1]. We > >> > > > > > > > > > > >>>>>> had a > >> > > > > > > > > > > >>>>>>>>>>>>>>> several > >> > > > > > > > > > > >>>>>>>>>>>>>>>>> discussions with Dong Ling about > the > >> > > > design, > >> > > > > > and > >> > > > > > > > > thanks > >> > > > > > > > > > > >>>> for all > >> > > > > > > > > > > >>>>>>>>>>>> the > >> > > > > > > > > > > >>>>>>>>>>>>>>>>> valuable advice. > >> > > > > > > > > > > >>>>>>>>>>>>>>>>> > >> > > > > > > > > > > >>>>>>>>>>>>>>>>> The FLIP aims to target the > use-case > >> > > where > >> > > > > user > >> > > > > > > > want > >> > > > > > > > > to > >> > > > > > > > > > > >>>> run a > >> > > > > > > > > > > >>>>>>>>>>>> Flink > >> > > > > > > > > > > >>>>>>>>>>>>>>> job to > >> > > > > > > > > > > >>>>>>>>>>>>>>>>> backfill historical data in a high > >> > > > throughput > >> > > > > > > > manner > >> > > > > > > > > > > >> and > >> > > > > > > > > > > >>>>>> continue > >> > > > > > > > > > > >>>>>>>>>>>>>>>>> processing real-time data with low > >> > > latency. > >> > > > > > > > Building > >> > > > > > > > > > > >> upon > >> > > > > > > > > > > >>>> the > >> > > > > > > > > > > >>>>>>>>>>>>> backlog > >> > > > > > > > > > > >>>>>>>>>>>>>>>>> concept introduced in FLIP-309[2], > >> this > >> > > > > > proposal > >> > > > > > > > > > > >> enables > >> > > > > > > > > > > >>>>>> sources > >> > > > > > > > > > > >>>>>>>>>>>> to > >> > > > > > > > > > > >>>>>>>>>>>>>>> report > >> > > > > > > > > > > >>>>>>>>>>>>>>>>> their status of processing backlog > >> > based > >> > > on > >> > > > > the > >> > > > > > > > > > > >> watermark > >> > > > > > > > > > > >>>> lag. > >> > > > > > > > > > > >>>>>>>>>>>>>>>>> > >> > > > > > > > > > > >>>>>>>>>>>>>>>>> We would greatly appreciate any > >> > comments > >> > > or > >> > > > > > > > feedback > >> > > > > > > > > > > >> you > >> > > > > > > > > > > >>>> may > >> > > > > > > > > > > >>>>>> have > >> > > > > > > > > > > >>>>>>>>>>>>> on > >> > > > > > > > > > > >>>>>>>>>>>>>>> this > >> > > > > > > > > > > >>>>>>>>>>>>>>>>> proposal. > >> > > > > > > > > > > >>>>>>>>>>>>>>>>> > >> > > > > > > > > > > >>>>>>>>>>>>>>>>> Best, > >> > > > > > > > > > > >>>>>>>>>>>>>>>>> Xuannan > >> > > > > > > > > > > >>>>>>>>>>>>>>>>> > >> > > > > > > > > > > >>>>>>>>>>>>>>>>> > >> > > > > > > > > > > >>>>>>>>>>>>>>>>> [1] > >> > > > > > > > > > > >>>>>>>>>>>>>>>>> > >> > > > > > > > > > > >>>>>>>>>>>>>>> > >> > > > > > > > > > > >>>>>>>>>>>>> > >> > > > > > > > > > > >>>>>>>>>>>> > >> > > > > > > > > > > >>>>>>>>>> > >> > > > > > > > > > > >>>>>>>> > >> > > > > > > > > > > >>>>>> > >> > > > > > > > > > > >>>> > >> > > > > > > > > > > >> > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-328%3A+Allow+source+operators+to+determine+isProcessingBacklog+based+on+watermark+lag > >> > > > > > > > > > > >>>>>>>>>> < > >> > > > > > > > > > > >>>>>>>>>> > >> > > > > > > > > > > >>>>>>>> > >> > > > > > > > > > > >>>>>> > >> > > > > > > > > > > >>>> > >> > > > > > > > > > > >> > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-328%3A+Allow+source+operators+to+determine+isProcessingBacklog+based+on+watermark+lag > >> > > > > > > > > > > >>>>>>>>>>> > >> > > > > > > > > > > >>>>>>>>>>>>>>>>> [2] > >> > > > > > > > > > > >>>>>>>>>>>>>>>>> > >> > > > > > > > > > > >>>>>>>>>>>>>>> > >> > > > > > > > > > > >>>>>>>>>>>>> > >> > > > > > > > > > > >>>>>>>>>>>> > >> > > > > > > > > > > >>>>>>>>>> > >> > > > > > > > > > > >>>>>>>> > >> > > > > > > > > > > >>>>>> > >> > > > > > > > > > > >>>> > >> > > > > > > > > > > >> > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-309%3A+Support+using+larger+checkpointing+interval+when+source+is+processing+backlog > >> > > > > > > > > > > >>>>>>>>>> < > >> > > > > > > > > > > >>>>>>>>>> > >> > > > > > > > > > > >>>>>>>> > >> > > > > > > > > > > >>>>>> > >> > > > > > > > > > > >>>> > >> > > > > > > > > > > >> > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-309%3A+Support+using+larger+checkpointing+interval+when+source+is+processing+backlog > >> > > > > > > > > > > >>>>>>>> > >> > > > > > > > > > > >>>>>>>> > >> > > > > > > > > > > >>>>>>>> > >> > > > > > > > > > > >>>>>> > >> > > > > > > > > > > >>>>>> > >> > > > > > > > > > > >>>> > >> > > > > > > > > > > >> > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > > >