Hi Jarl and Dong,

I'm a bit confused about the difference between the two competing options.
Could one of you elaborate what's the difference between:
> 2) The semantics of `#setIsProcessingBacklog(false)` is that it overrides
> the effect of the previous invocation (if any) of
> `#setIsProcessingBacklog(true)` on the given source instance.

and

> 2) The semantics of `#setIsProcessingBacklog(false)` is that the given
> source instance will have watermarkLag=false.

?

Best,
Piotrek

czw., 21 wrz 2023 o 15:28 Dong Lin <lindon...@gmail.com> napisał(a):

> Hi all,
>
> Jark and I discussed this FLIP offline and I will summarize our discussion
> below. It would be great if you could provide your opinion of the proposed
> options.
>
> Regarding the target use-cases:
> - We both agreed that MySQL CDC should have backlog=true when watermarkLag
> is large during the binlog phase.
> - Dong argued that other streaming sources with watermarkLag defined (e.g.
> Kafka) should also have backlog=true when watermarkLag is large. The
> pros/cons discussion below assume this use-case needs to be supported.
>
> The 1st option is what is currently proposed in FLIP-328, with the
> following key characteristics:
> 1) There is one job-level config (i.e.
> pipeline.backlog.watermark-lag-threshold) that applies to all sources with
> watermarkLag metric defined.
> 2) The semantics of `#setIsProcessingBacklog(false)` is that it overrides
> the effect of the previous invocation (if any) of
> `#setIsProcessingBacklog(true)` on the given source instance.
>
> The 2nd option is what Jark proposed in this email thread, with the
> following key characteristics:
> 1) Add source-specific config (both Java API and SQL source property) to
> every source for which we want to set backlog status based on the
> watermarkLag metric. For example, we might add separate Java APIs
> `#setWatermarkLagThreshold`  for MySQL CDC source, HybridSource,
> KafkaSource, PulsarSource etc.
> 2) The semantics of `#setIsProcessingBacklog(false)` is that the given
> source instance will have watermarkLag=false.
>
> Here are the key pros/cons of these two options.
>
> Cons of the 1st option:
> 1) The semantics of `#setIsProcessingBacklog(false)` is harder to
> understand for Flink operator developers than the corresponding semantics
> in option-2.
>
> Cons of the  2nd option:
> 1) More work for end-users. For a job with multiple sources that need to be
> configured with a watermark lag threshold, users need to specify multiple
> configs (one for each source) instead of specifying one job-level config.
>
> 2) More work for Flink operator developers. Overall there are more public
> APIs (one Java API and one SQL property for each source that needs to
> determine backlog based on watermark) exposed to end users. This also adds
> more burden for the Flink community to maintain these APIs.
>
> 3) It would be hard (e.g. require backward incompatible API change) to
> extend the Flink runtime to support job-level config to set watermark
> strategy in the future (e.g. support the
> pipeline.backlog.watermark-lag-threshold in option-1). This is because an
> existing source operator's code might have hardcoded an invocation of
> `#setIsProcessingBacklog(false)`, which means the backlog status must be
> set to true, which prevents Flink runtime from setting backlog=true when a
> new strategy is triggered.
>
> Overall, I am still inclined to choose option-1 because it is more
> extensible and simpler to use in the long term when we want to support/use
> multiple sources whose backlog status can change based on the watermark
> lag. While option-1's `#setIsProcessingBacklog` is a bit harder to
> understand than option-2, I think this overhead/cost is worthwhile as it
> makes end-users' life easier in the long term.
>
> Jark: thank you for taking the time to review this FLIP. Please feel free
> to comment if I missed anything in the pros/cons above.
>
> Jark and I have not reached agreement on which option is better. It will be
> really helpful if we can get more comments on these options.
>
> Thanks,
> Dong
>
>
> On Tue, Sep 19, 2023 at 11:26 AM Dong Lin <lindon...@gmail.com> wrote:
>
> > Hi Jark,
> >
> > Thanks for the reply. Please see my comments inline.
> >
> > On Tue, Sep 19, 2023 at 10:12 AM Jark Wu <imj...@gmail.com> wrote:
> >
> >> Hi Dong,
> >>
> >> Sorry for the late reply.
> >>
> >> > The rationale is that if there is any strategy that is triggered and
> >> says
> >> > backlog=true, then job's backlog should be true. Otherwise, the job's
> >> > backlog status is false.
> >>
> >> I'm quite confused about this. Does that mean, if the source is in the
> >> changelog phase, the source has to continuously invoke
> >> "setIsProcessingBacklog(true)" (in an infinite loop?). Otherwise,
> >> the job's backlog status would be set to false by the framework?
> >>
> >
> > No, the source would not have to continuously invoke
> > setIsProcessingBacklog(true) in an infinite loop.
> >
> > Actually, I am not very sure why there is confusion that "the job's
> > backlog status would be set to false by the framework". Could you explain
> > where that comes from?
> >
> > I guess it might be useful to provide a complete overview of how
> > setIsProcessingBacklog(...)
> > and pipline.backlog.watermark-lag-threshold work together to determine
> the
> > overall job's backlog status. Let me explain it below.
> >
> > Here is the semantics/behavior of setIsProcessingBacklog(..).
> > - This method is invoked on a per-source basis.
> > - This method can be invoked multiple times with true/false as its input
> > parameter.
> > - For a given source, the last invocation of this method overwrites the
> > effect of earlier invocation of this method.
> > - For a given source, if this method has been invoked at least once and
> > the last invocation of this method has isProcessingBacklog = true, then
> it
> > is guaranteed that the backlog status of this source is set to true.
> >
> > Here is the semantics/behavior of
> pipline.backlog.watermark-lag-threshold:
> > - This config is specified at the job level and applies to every source
> > included in this job.
> > - For a given source, if it's watermarkLag metric is available (see
> > FLIP-33) and watermarkLag > watermark-lag-threshold, then it is
> guaranteed
> > that backlog status of this source is set to true.
> >
> > Here is how the source's backlog status is determined: If a rule
> specified
> > above says the source's backog status should be true, then the source's
> > backlog status is set to true. Otherwise, it is set to false.
> >
> > How is how the job's backlog status is determined: If there exists a
> > source that is currently running and the source's backlog status is set
> to
> > true, then the job's backlog status is set to true. Otherwise, it is set
> to
> > false.
> >
> > Hopefully this can help clarify the behavior. If needed, we can update
> the
> > relevant doc (e.g. setIsProcessingBacklog() Java doc) to make semantics
> > above clearer for Flink users.
> >
> > And I would be happy to discuss/explain this design offline when you have
> > time.
> >
> > Thanks,
> > Dong
> >
> >
> >>
> >> Best,
> >> Jark
> >>
> >> On Tue, 19 Sept 2023 at 09:13, Dong Lin <lindon...@gmail.com> wrote:
> >>
> >> > Hi Jark,
> >> >
> >> > Do you have time to comment on whether the current design looks good?
> >> >
> >> > I plan to start voting in 3 days if there is no follow-up comment.
> >> >
> >> > Thanks,
> >> > Dong
> >> >
> >> >
> >> > On Fri, Sep 15, 2023 at 2:01 PM Jark Wu <imj...@gmail.com> wrote:
> >> >
> >> > > Hi Dong,
> >> > >
> >> > > > Note that we can not simply enforce the semantics of "any
> >> invocation of
> >> > > > setIsProcessingBacklog(false) will set the job's backlog status to
> >> > > false".
> >> > > > Suppose we have a job with two operators, where operatorA invokes
> >> > > > setIsProcessingBacklog(false) and operatorB invokes
> >> > > > setIsProcessingBacklog(true). There will be conflict if we use the
> >> > > > semantics of "any invocation of setIsProcessingBacklog(false) will
> >> set
> >> > > the
> >> > > > job's backlog status to false".
> >> > >
> >> > > So it should set job's backlog status to false if the job has only a
> >> > single
> >> > > source,
> >> > > right?
> >> > >
> >> > > Could you elaborate on the behavior if there is a job with a single
> >> > source,
> >> > > and the watermark lag exceeds the configured value (should set
> >> backlog to
> >> > > true?),
> >> > > but the source invokes "setIsProcessingBacklog(false)"? Or the
> inverse
> >> > one,
> >> > > the source invokes "setIsProcessingBacklog(false)" first, but the
> >> > watermark
> >> > > lag
> >> > > exceeds the configured value.
> >> > >
> >> > > This is the conflict I'm concerned about.
> >> > >
> >> > > Best,
> >> > > Jark
> >> > >
> >> > > On Fri, 15 Sept 2023 at 12:00, Dong Lin <lindon...@gmail.com>
> wrote:
> >> > >
> >> > > > Hi Jark,
> >> > > >
> >> > > > Please see my comments inline.
> >> > > >
> >> > > > On Fri, Sep 15, 2023 at 10:35 AM Jark Wu <imj...@gmail.com>
> wrote:
> >> > > >
> >> > > > > Hi Dong,
> >> > > > >
> >> > > > > Please see my comments inline below.
> >> > > >
> >> > > >
> >> > > > > > Hmm.. can you explain what you mean by "different watermark
> >> delay
> >> > > > > > definitions for each source"?
> >> > > > >
> >> > > > > For example, "table1" defines a watermark with delay 5 seconds,
> >> > > > > "table2" defines a watermark with delay 10 seconds. They have
> >> > different
> >> > > > > watermark delay definitions. So it is also reasonable they have
> >> > > different
> >> > > > > watermark lag definitions, e.g., "table1" allows "10mins" and
> >> > "table2"
> >> > > > > allows "20mins".
> >> > > > >
> >> > > >
> >> > > > I think the watermark delay you mentioned above is conceptually /
> >> > > > fundamentally different from the watermark-lag-threshold proposed
> in
> >> > this
> >> > > > FLIP.
> >> > > >
> >> > > > It might be useful to revisit the semantics of these two concepts:
> >> > > > - watermark delay is used to account for the maximum amount of
> >> > > orderliness
> >> > > > that users expect (or willing to wait for) for records from a
> given
> >> > > source.
> >> > > > - watermark-lag-threshold is used to define when processing
> latency
> >> is
> >> > no
> >> > > > longer important (e.g. because data is already stale).
> >> > > >
> >> > > > Even though users might expect different out of orderliness for
> >> > different
> >> > > > sources, users do not necessarily have different definitions /
> >> > thresholds
> >> > > > for when a record is considered "already stale".
> >> > > >
> >> > > >
> >> > > > >
> >> > > > > > I think there is probably misunderstanding here. FLIP-309 does
> >> NOT
> >> > > > > directly
> >> > > > > > specify when backlog is false. It is intentionally specified
> in
> >> > such
> >> > > a
> >> > > > > way
> >> > > > > > that there will  not be any conflict between these rules.
> >> > > > >
> >> > > > > Do you mean FLIP-309 doesn't allow to specify backlog to be
> false?
> >> > > > > Is this mentioned in FLIP-309? This is completely different from
> >> > what I
> >> > > > >
> >> > > >
> >> > > > Can you explain what you mean by "allow to specify backlog to be
> >> > false"?
> >> > > >
> >> > > > If what you mean is that "can invoke
> setIsProcessingBacklog(false)",
> >> > then
> >> > > > FLIP-309 supports doing this.
> >> > > >
> >> > > > If what you mean is that "any invocation of
> >> > setIsProcessingBacklog(false)
> >> > > > will set the job's backlog status to false", then FLIP-309 does
> not
> >> > > support
> >> > > > this. I believe the existing Java doc of this API and FLIP-309 is
> >> > > > compatible with this explanation.
> >> > > >
> >> > > > Note that we can not simply enforce the semantics of "any
> >> invocation of
> >> > > > setIsProcessingBacklog(false) will set the job's backlog status to
> >> > > false".
> >> > > > Suppose we have a job with two operators, where operatorA invokes
> >> > > > setIsProcessingBacklog(false) and operatorB invokes
> >> > > > setIsProcessingBacklog(true). There will be conflict if we use the
> >> > > > semantics of "any invocation of setIsProcessingBacklog(false) will
> >> set
> >> > > the
> >> > > > job's backlog status to false".
> >> > > >
> >> > > > Would this answer your question?
> >> > > >
> >> > > > Best,
> >> > > > Dong
> >> > > >
> >> > > >
> >> > > > > understand. From the API interface
> >> > > "ctx.setIsProcessingBacklog(boolean)",
> >> > > > > it allows users to invoke "setIsProcessingBacklog(false)". And
> >> > FLIP-309
> >> > > > > also says "MySQL CDC source should report
> >> isProcessingBacklog=false
> >> > > > > at the beginning of the changelog stage." If not, maybe we need
> to
> >> > > > revisit
> >> > > > > FLIP-309.
> >> > > >
> >> > > >
> >> > > > > Best,
> >> > > > > Jark
> >> > > > >
> >> > > > >
> >> > > > >
> >> > > > > On Fri, 15 Sept 2023 at 08:41, Dong Lin <lindon...@gmail.com>
> >> wrote:
> >> > > > >
> >> > > > > > Hi Jark,
> >> > > > > >
> >> > > > > > Do you have any follow-up comment?
> >> > > > > >
> >> > > > > > My gut feeling is that suppose we need to support per-source
> >> > > watermark
> >> > > > > lag
> >> > > > > > specification in the future (not sure we have a use-case for
> >> this
> >> > > right
> >> > > > > > now), we can add such a config in the future with a follow-up
> >> FLIP.
> >> > > The
> >> > > > > > job-level config will still be useful as it makes users'
> >> > > configuration
> >> > > > > > simpler for common scenarios.
> >> > > > > >
> >> > > > > > If it is OK, can we agree to make incremental progress for
> Flink
> >> > and
> >> > > > > start
> >> > > > > > a voting thread for this FLIP?
> >> > > > > >
> >> > > > > > Thanks,
> >> > > > > > Dong
> >> > > > > >
> >> > > > > >
> >> > > > > > On Mon, Sep 11, 2023 at 4:41 PM Jark Wu <imj...@gmail.com>
> >> wrote:
> >> > > > > >
> >> > > > > > > Hi Dong,
> >> > > > > > >
> >> > > > > > > Please see my comments inline.
> >> > > > > > >
> >> > > > > > > >  As a result, the proposed job-level
> >> > > > > > > > config will be applied only in the changelog stage. So
> >> there is
> >> > > no
> >> > > > > > > > difference between these two approaches in this particular
> >> > case,
> >> > > > > right?
> >> > > > > > >
> >> > > > > > > How the job-level config can be applied ONLY in the
> changelog
> >> > > stage?
> >> > > > > > > I think it is only possible if it is implemented by the CDC
> >> > source
> >> > > > > > itself,
> >> > > > > > > because the framework doesn't know which stage of the source
> >> is.
> >> > > > > > > Know that the CDC source may emit watermarks with a very
> small
> >> > lag
> >> > > > > > > in the snapshot stage, and the job-level config may turn the
> >> > > backlog
> >> > > > > > > status into false.
> >> > > > > > >
> >> > > > > > > > On the other hand, per-source config will be necessary if
> >> users
> >> > > > want
> >> > > > > to
> >> > > > > > > > apply different watermark lag thresholds for different
> >> sources
> >> > in
> >> > > > the
> >> > > > > > > same
> >> > > > > > > > job.
> >> > > > > > >
> >> > > > > > > We also have different watermark delay definitions for each
> >> > source,
> >> > > > > > > I think this's also reasonable and necessary to have
> different
> >> > > > > watermark
> >> > > > > > > lags.
> >> > > > > > >
> >> > > > > > >
> >> > > > > > > > Each source can have its own rule that specifies when the
> >> > backlog
> >> > > > can
> >> > > > > > be
> >> > > > > > > true
> >> > > > > > > > (e.g. MySql CDC says the backlog should be true during the
> >> > > snapshot
> >> > > > > > > stage).
> >> > > > > > > > And we can have a job-level config that specifies when the
> >> > > backlog
> >> > > > > > should
> >> > > > > > > > be true. Note that it is designed in such a way that none
> of
> >> > > these
> >> > > > > > rules
> >> > > > > > > > specify when the backlog should be false. That is why
> there
> >> is
> >> > no
> >> > > > > > > conflict
> >> > > > > > > > by definition.
> >> > > > > > >
> >> > > > > > > IIUC, FLIP-309 provides `setIsProcessingBacklog` to specify
> >> when
> >> > > the
> >> > > > > > > backlog
> >> > > > > > > is true and when is FALSE. This conflicts with the job-level
> >> > config
> >> > > > as
> >> > > > > it
> >> > > > > > > will turn
> >> > > > > > > the status into true.
> >> > > > > > >
> >> > > > > > > > If I understand your comments correctly, you mean that we
> >> might
> >> > > > have
> >> > > > > a
> >> > > > > > > > Flink SQL DDL with user-defined watermark expressions. And
> >> > users
> >> > > > also
> >> > > > > > > want
> >> > > > > > > > to set the backlog to true if the watermark generated by
> >> that
> >> > > > > > > > user-specified expression exceeds a threshold.
> >> > > > > > >
> >> > > > > > > No. I mean the source may not support generating watermarks,
> >> so
> >> > the
> >> > > > > > > watermark
> >> > > > > > > expression is applied in a following operator (instead of in
> >> the
> >> > > > source
> >> > > > > > > operator).
> >> > > > > > > This will result in the watermark lag doesn't work in this
> >> case
> >> > and
> >> > > > > > confuse
> >> > > > > > > users.
> >> > > > > > >
> >> > > > > > > > You are right that this is a limitation. However, this is
> >> only
> >> > a
> >> > > > > > > short-term
> >> > > > > > > > limitation which we added to make sure that we can focus
> on
> >> the
> >> > > > > > > capability
> >> > > > > > > > to switch from backlog=true to backlog=false. In the
> >> future, we
> >> > > > will
> >> > > > > > > remove
> >> > > > > > > > this limitation and also support switching from
> >> backlog=false
> >> > to
> >> > > > > > > > backlog=true.
> >> > > > > > >
> >> > > > > > > I can understand it may be difficult to support runtime mode
> >> > > > switching
> >> > > > > > back
> >> > > > > > > and forth.
> >> > > > > > > However, I think this should be a limitation of FLIP-327,
> not
> >> > > > FLIP-328.
> >> > > > > > > IIUC,
> >> > > > > > > FLIP-309 doesn't have this limitation, right? I just don't
> >> > > understand
> >> > > > > > > what's the
> >> > > > > > > challenge to switch a flag?
> >> > > > > > >
> >> > > > > > > Best,
> >> > > > > > > Jark
> >> > > > > > >
> >> > > > > > >
> >> > > > > > > On Sun, 10 Sept 2023 at 19:44, Dong Lin <
> lindon...@gmail.com>
> >> > > wrote:
> >> > > > > > >
> >> > > > > > > > Hi Jark,
> >> > > > > > > >
> >> > > > > > > > Thanks for the comments. Please see my comments inline.
> >> > > > > > > >
> >> > > > > > > > On Sat, Sep 9, 2023 at 4:13 PM Jark Wu <imj...@gmail.com>
> >> > wrote:
> >> > > > > > > >
> >> > > > > > > > > Hi Xuannan,
> >> > > > > > > > >
> >> > > > > > > > > I leave my comments inline.
> >> > > > > > > > >
> >> > > > > > > > > > In the case where a user wants to
> >> > > > > > > > > > use a CDC source and also determine backlog status
> >> based on
> >> > > > > > watermark
> >> > > > > > > > > > lag, we still need to define the rule when that occurs
> >> > > > > > > > >
> >> > > > > > > > > This rule should be defined by the source itself (who
> >> knows
> >> > > > backlog
> >> > > > > > > > best),
> >> > > > > > > > > not by the framework. In the case of CDC source, it
> >> reports
> >> > > > > > > > isBacklog=true
> >> > > > > > > > > during snapshot stage, and report isBacklog=false during
> >> > > > changelog
> >> > > > > > > stage
> >> > > > > > > > if
> >> > > > > > > > > watermark-lag is within the threshold.
> >> > > > > > > > >
> >> > > > > > > >
> >> > > > > > > > I am not sure I fully understand the difference between
> >> adding
> >> > a
> >> > > > > > > job-level
> >> > > > > > > > config vs. adding a per-source config.
> >> > > > > > > >
> >> > > > > > > > In the case of CDC, its watermark lag should be either
> >> > > unde-defined
> >> > > > > or
> >> > > > > > > > really large in the snapshot stage. As a result, the
> >> proposed
> >> > > > > job-level
> >> > > > > > > > config will be applied only in the changelog stage. So
> >> there is
> >> > > no
> >> > > > > > > > difference between these two approaches in this particular
> >> > case,
> >> > > > > right?
> >> > > > > > > >
> >> > > > > > > > There are two advantages of the job-level config over
> >> > per-source
> >> > > > > > config:
> >> > > > > > > >
> >> > > > > > > > 1) Configuration is simpler. For example, suppose a user
> >> has a
> >> > > > Flink
> >> > > > > > job
> >> > > > > > > > that consumes records from multiple Kafka sources and
> wants
> >> to
> >> > > > > > determine
> >> > > > > > > > backlog status for these Kafka sources using the same
> >> watermark
> >> > > lag
> >> > > > > > > > threshold, there is no need for users to repeatedly
> specify
> >> > this
> >> > > > > > > threshold
> >> > > > > > > > for each source.
> >> > > > > > > >
> >> > > > > > > > 2) There is a smaller number of public APIs overall. In
> >> > > particular,
> >> > > > > > > instead
> >> > > > > > > > of repeatedly adding a
> >> > > setProcessingBacklogWatermarkLagThreshold()
> >> > > > > API
> >> > > > > > > for
> >> > > > > > > > every source operator that has even-time watermark lag
> >> defined,
> >> > > we
> >> > > > > only
> >> > > > > > > > need to add one job-level config. Less public API means
> >> better
> >> > > > > > simplicity
> >> > > > > > > > and maintainability in general.
> >> > > > > > > >
> >> > > > > > > > On the other hand, per-source config will be necessary if
> >> users
> >> > > > want
> >> > > > > to
> >> > > > > > > > apply different watermark lag thresholds for different
> >> sources
> >> > in
> >> > > > the
> >> > > > > > > same
> >> > > > > > > > job. Personally, I find this a bit counter-intuitive for
> >> users
> >> > to
> >> > > > > > specify
> >> > > > > > > > different watermark lag thresholds in the same job.
> >> > > > > > > >
> >> > > > > > > > Do you think there is any real-word use-case that requires
> >> > this?
> >> > > > > Could
> >> > > > > > > you
> >> > > > > > > > provide a specific use-case where per-source config can
> >> provide
> >> > > an
> >> > > > > > > > advantage over the job-level config?
> >> > > > > > > >
> >> > > > > > > >
> >> > > > > > > > > I think it's not intuitive to combine it with the
> logical
> >> OR
> >> > > > > > operation.
> >> > > > > > > > > Even for the
> >> > > > > > > > > combination logic of backlog status from different
> >> channels,
> >> > > > > FLIP-309
> >> > > > > > > > said
> >> > > > > > > > > it is
> >> > > > > > > > > "up to the operator to determine its output records'
> >> > isBacklog
> >> > > > > value"
> >> > > > > > > and
> >> > > > > > > > > proposed
> >> > > > > > > > > 3 different strategies. Therefore, I think backlog
> status
> >> > from
> >> > > a
> >> > > > > > single
> >> > > > > > > > > source should
> >> > > > > > > > > be up to the source.
> >> > > > > > > >
> >> > > > > > > >
> >> > > > > > > > For both the job-level config and the per-source config,
> it
> >> is
> >> > > > > > eventually
> >> > > > > > > > up to the user to decide the computation logic of the
> >> backlog
> >> > > > status.
> >> > > > > > > > Whether this mechanism is implemented at the per-source
> >> level
> >> > or
> >> > > > > > > framework
> >> > > > > > > > level is probably more like an implementation detail.
> >> > > > > > > >
> >> > > > > > > > Eventually, I think the choice between these two
> approaches
> >> > > depends
> >> > > > > on
> >> > > > > > > > whether we have any use-case for users to specify
> different
> >> > > > watermark
> >> > > > > > lag
> >> > > > > > > > thresholds in the same job.
> >> > > > > > > >
> >> > > > > > > >
> >> > > > > > > > >
> >> > > > > > > > > IMO, a better API design is not how to resolve conflicts
> >> but
> >> > > not
> >> > > > > > > > > introducing conflicts.
> >> > > > > > > >
> >> > > > > > > >
> >> > > > > > > > Just to clarify, the current FLIP does not introduce any
> >> > > conflict.
> >> > > > > Each
> >> > > > > > > > source can have its own rule that specifies when the
> backlog
> >> > can
> >> > > be
> >> > > > > > true
> >> > > > > > > > (e.g. MySql CDC says the backlog should be true during the
> >> > > snapshot
> >> > > > > > > stage).
> >> > > > > > > > And we can have a job-level config that specifies when the
> >> > > backlog
> >> > > > > > should
> >> > > > > > > > be true. Note that it is designed in such a way that none
> of
> >> > > these
> >> > > > > > rules
> >> > > > > > > > specify when the backlog should be false. That is why
> there
> >> is
> >> > no
> >> > > > > > > conflict
> >> > > > > > > > by definition.
> >> > > > > > > >
> >> > > > > > > >
> >> > > > > > > >
> >> > > > > > > > Let the source determine backlog status removes the
> >> conflicts
> >> > > and I
> >> > > > > > don't
> >> > > > > > > > > see big
> >> > > > > > > > > disadvantages.
> >> > > > > > > > >
> >> > > > > > > > > > It should not confuse the user that
> >> > > > > > > > > > DataStream#assignTimestampsAndWatermarks doesn't work
> >> with
> >> > > > > > > > > > backlog.watermark-lag-threshold, as it is not a
> source.
> >> > > > > > > > >
> >> > > > > > > > > Hmm, so this configuration may confuse Flink SQL users,
> >> > because
> >> > > > all
> >> > > > > > > > > watermarks
> >> > > > > > > > > are defined on the source DDL, but it may use a separate
> >> > > operator
> >> > > > > to
> >> > > > > > > emit
> >> > > > > > > > > watermarks
> >> > > > > > > > > if the source doesn't support emitting watermarks.
> >> > > > > > > > >
> >> > > > > > > >
> >> > > > > > > > If I understand your comments correctly, you mean that we
> >> might
> >> > > > have
> >> > > > > a
> >> > > > > > > > Flink SQL DDL with user-defined watermark expressions. And
> >> > users
> >> > > > also
> >> > > > > > > want
> >> > > > > > > > to set the backlog to true if the watermark generated by
> >> that
> >> > > > > > > > user-specified expression exceeds a threshold.
> >> > > > > > > >
> >> > > > > > > > That is a good point and use-case. I agree we should also
> >> cover
> >> > > > this
> >> > > > > > > > scenario. And we can update FLIP-328 to mention that the
> >> > > job-level
> >> > > > > > config
> >> > > > > > > > will also be applicable when the watermark derived from
> the
> >> > Flink
> >> > > > SQL
> >> > > > > > DDL
> >> > > > > > > > exceeds this threshold. Would this address your concern?
> >> > > > > > > >
> >> > > > > > > >
> >> > > > > > > > >
> >> > > > > > > > > > I think the description in the FLIP actually means the
> >> > other
> >> > > > way
> >> > > > > > > > > > around, where the job can never switch back to batch
> >> mode
> >> > > once
> >> > > > it
> >> > > > > > has
> >> > > > > > > > > > switched into streaming mode. This is to align with
> the
> >> > > current
> >> > > > > > state
> >> > > > > > > > > > of FLIP-327[1], where only switching from batch to
> >> stream
> >> > > mode
> >> > > > is
> >> > > > > > > > > > supported.
> >> > > > > > > > >
> >> > > > > > > > > This sounds like a limitation of FLIP-327 (that
> execution
> >> > mode
> >> > > > > > depends
> >> > > > > > > on
> >> > > > > > > > > backlog status).
> >> > > > > > > > > But the backlog status shouldn't have this limitation,
> >> > because
> >> > > it
> >> > > > > is
> >> > > > > > > not
> >> > > > > > > > > only used for execution
> >> > > > > > > > > switching.
> >> > > > > > > > >
> >> > > > > > > >
> >> > > > > > > > You are right that this is a limitation. However, this is
> >> only
> >> > a
> >> > > > > > > short-term
> >> > > > > > > > limitation which we added to make sure that we can focus
> on
> >> the
> >> > > > > > > capability
> >> > > > > > > > to switch from backlog=true to backlog=false. In the
> >> future, we
> >> > > > will
> >> > > > > > > remove
> >> > > > > > > > this limitation and also support switching from
> >> backlog=false
> >> > to
> >> > > > > > > > backlog=true.
> >> > > > > > > >
> >> > > > > > > > The capability to switch from backlog=true to
> backlog=false
> >> > will
> >> > > > > > > mitigate a
> >> > > > > > > > lot of problems we are facing now. As it is common for
> >> users to
> >> > > > > start a
> >> > > > > > > > Flink job to process backlog data followed by real-time
> >> data.
> >> > On
> >> > > > the
> >> > > > > > > other
> >> > > > > > > > hand, switching from backlog=false to backlog=true is
> useful
> >> > when
> >> > > > > there
> >> > > > > > > is
> >> > > > > > > > a traffic spike while the Flink job is processing
> real-time
> >> > data,
> >> > > > > which
> >> > > > > > > is
> >> > > > > > > > also useful to address but less important than the
> previous
> >> > one.
> >> > > > > > > >
> >> > > > > > > > Given that both features require considerable changes to
> the
> >> > > > > underlying
> >> > > > > > > > runtime, we think it might be useful and safe to tackle
> them
> >> > one
> >> > > by
> >> > > > > > one.
> >> > > > > > > >
> >> > > > > > > > Thanks again for the comments. Please let us know what you
> >> > think.
> >> > > > > > > >
> >> > > > > > > > Best,
> >> > > > > > > > Dong
> >> > > > > > > >
> >> > > > > > > >
> >> > > > > > > > >
> >> > > > > > > > > Best,
> >> > > > > > > > > Jark
> >> > > > > > > > >
> >> > > > > > > > >
> >> > > > > > > > >
> >> > > > > > > > > On Fri, 8 Sept 2023 at 19:09, Xuannan Su <
> >> > > suxuanna...@gmail.com>
> >> > > > > > > wrote:
> >> > > > > > > > >
> >> > > > > > > > > > Hi Jark and Leonard,
> >> > > > > > > > > >
> >> > > > > > > > > > Thanks for the comments. Please see my reply below.
> >> > > > > > > > > >
> >> > > > > > > > > > @Jark
> >> > > > > > > > > >
> >> > > > > > > > > > > I think a better API doesn't compete with itself.
> >> > > Therefore,
> >> > > > > I'm
> >> > > > > > in
> >> > > > > > > > > > favor of
> >> > > > > > > > > > > supporting the watermark lag threshold for each
> source
> >> > > > without
> >> > > > > > > > > > introducing
> >> > > > > > > > > > > any framework API and configuration.
> >> > > > > > > > > >
> >> > > > > > > > > > I don't think supporting the watermark lag threshold
> for
> >> > each
> >> > > > > > source
> >> > > > > > > > > > can avoid the competition problem. In the case where a
> >> user
> >> > > > wants
> >> > > > > > to
> >> > > > > > > > > > use a CDC source and also determine backlog status
> >> based on
> >> > > > > > watermark
> >> > > > > > > > > > lag, we still need to define the rule when that
> occurs.
> >> > With
> >> > > > that
> >> > > > > > > > > > said, I think it is more intuitive to combine it with
> >> the
> >> > > > logical
> >> > > > > > OR
> >> > > > > > > > > > operation, as the strategies (FLIP-309, FLIP-328) only
> >> > > > determine
> >> > > > > > when
> >> > > > > > > > > > the source's backlog status should be True. What do
> you
> >> > > think?
> >> > > > > > > > > >
> >> > > > > > > > > > > Besides, this can address another concern that the
> >> > > watermark
> >> > > > > may
> >> > > > > > be
> >> > > > > > > > > > > generated by DataStream#assignTimestampsAnd
> >> > > > > > > > > > > Watermarks which doesn't
> >> > > > > > > > > > > work with the backlog.watermark-lag-threshold job
> >> config
> >> > > > > > > > > >
> >> > > > > > > > > > The description of the configuration explicitly states
> >> that
> >> > > "a
> >> > > > > > source
> >> > > > > > > > > > would report isProcessingBacklog=true if its watermark
> >> lag
> >> > > > > exceeds
> >> > > > > > > the
> >> > > > > > > > > > configured value". It should not confuse the user that
> >> > > > > > > > > > DataStream#assignTimestampsAndWatermarks doesn't work
> >> with
> >> > > > > > > > > > backlog.watermark-lag-threshold, as it is not a
> source.
> >> > > > > > > > > >
> >> > > > > > > > > > > Does that mean the job can never back to streaming
> >> mode
> >> > > once
> >> > > > > > > switches
> >> > > > > > > > > > into
> >> > > > > > > > > > > backlog mode? It sounds like not a complete FLIP to
> >> me.
> >> > Is
> >> > > it
> >> > > > > > > > possible
> >> > > > > > > > > to
> >> > > > > > > > > > > support switching back in this FLIP?
> >> > > > > > > > > >
> >> > > > > > > > > > I think the description in the FLIP actually means the
> >> > other
> >> > > > way
> >> > > > > > > > > > around, where the job can never switch back to batch
> >> mode
> >> > > once
> >> > > > it
> >> > > > > > has
> >> > > > > > > > > > switched into streaming mode. This is to align with
> the
> >> > > current
> >> > > > > > state
> >> > > > > > > > > > of FLIP-327[1], where only switching from batch to
> >> stream
> >> > > mode
> >> > > > is
> >> > > > > > > > > > supported.
> >> > > > > > > > > >
> >> > > > > > > > > > @Leonard
> >> > > > > > > > > >
> >> > > > > > > > > > > > The FLIP describe that: And it should report
> >> > > > > > > > > isProcessingBacklog=false
> >> > > > > > > > > > at the beginning of the snapshot stage.
> >> > > > > > > > > > > This should be “changelog stage”
> >> > > > > > > > > >
> >> > > > > > > > > > I think the description is in FLIP-309. Thanks for
> >> pointing
> >> > > > out.
> >> > > > > I
> >> > > > > > > > > > updated the description.
> >> > > > > > > > > >
> >> > > > > > > > > > > I'm not sure if it's enough to support this feature
> >> only
> >> > in
> >> > > > > > FLIP-27
> >> > > > > > > > > > Source. Although we are pushing the sourceFunction API
> >> to
> >> > be
> >> > > > > > removed,
> >> > > > > > > > > these
> >> > > > > > > > > > APIs will be survive one or two versions in flink repo
> >> > before
> >> > > > > they
> >> > > > > > > are
> >> > > > > > > > > > actually removed.
> >> > > > > > > > > >
> >> > > > > > > > > > I agree that it is good to support the SourceFunction
> >> API.
> >> > > > > However,
> >> > > > > > > > > > given that the SourceFunction API is marked as
> >> deprecated,
> >> > I
> >> > > > > think
> >> > > > > > I
> >> > > > > > > > > > will prioritize supporting the FLIP-27 Source. We can
> >> > support
> >> > > > the
> >> > > > > > > > > > SourceFunction API after the
> >> > > > > > > > > > FLIP-27 source. What do you think?
> >> > > > > > > > > >
> >> > > > > > > > > > Best regards,
> >> > > > > > > > > > Xuannan
> >> > > > > > > > > >
> >> > > > > > > > > > [1]
> >> > > > > > > > > >
> >> > > > > > > > >
> >> > > > > > > >
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-327%3A+Support+switching+from+batch+to+stream+mode+to+improve+throughput+when+processing+backlog+data
> >> > > > > > > > > >
> >> > > > > > > > > >
> >> > > > > > > > > >
> >> > > > > > > > > >
> >> > > > > > > > > > On Fri, Sep 8, 2023 at 1:02 AM Leonard Xu <
> >> > xbjt...@gmail.com
> >> > > >
> >> > > > > > wrote:
> >> > > > > > > > > > >
> >> > > > > > > > > > > Thanks Xuannan for driving this FLIP !
> >> > > > > > > > > > >
> >> > > > > > > > > > > The proposal generally looks good to me, but I still
> >> left
> >> > > > some
> >> > > > > > > > > comments:
> >> > > > > > > > > > >
> >> > > > > > > > > > > > One more question about the FLIP is that the FLIP
> >> says
> >> > > > "Note
> >> > > > > > that
> >> > > > > > > > > this
> >> > > > > > > > > > > > config does not support switching source's
> >> > > > > isProcessingBacklog
> >> > > > > > > from
> >> > > > > > > > > > false to true
> >> > > > > > > > > > > > for now.” Does that mean the job can never back to
> >> > > > streaming
> >> > > > > > mode
> >> > > > > > > > > once
> >> > > > > > > > > > switches into
> >> > > > > > > > > > > > backlog mode? It sounds like not a complete FLIP
> to
> >> me.
> >> > > Is
> >> > > > it
> >> > > > > > > > > possible
> >> > > > > > > > > > to
> >> > > > > > > > > > > > support switching back in this FLIP?
> >> > > > > > > > > > > +1 for Jark’s concern, IIUC, the state transition of
> >> > > > > > > > > IsProcessingBacklog
> >> > > > > > > > > > depends on whether the data in the source is
> processing
> >> > > backlog
> >> > > > > > data
> >> > > > > > > or
> >> > > > > > > > > > not. Different sources will have different backlog
> >> status
> >> > and
> >> > > > > which
> >> > > > > > > may
> >> > > > > > > > > > change over time. From a general perspective, we
> should
> >> not
> >> > > > have
> >> > > > > > this
> >> > > > > > > > > > restriction.
> >> > > > > > > > > > >
> >> > > > > > > > > > > > The FLIP describe that: And it should report
> >> > > > > > > > > isProcessingBacklog=false
> >> > > > > > > > > > at the beginning of the snapshot stage.
> >> > > > > > > > > > > This should be “changelog stage”
> >> > > > > > > > > > >
> >> > > > > > > > > > > I'm not sure if it's enough to support this feature
> >> only
> >> > in
> >> > > > > > FLIP-27
> >> > > > > > > > > > Source. Although we are pushing the sourceFunction API
> >> to
> >> > be
> >> > > > > > removed,
> >> > > > > > > > > these
> >> > > > > > > > > > APIs will be survive one or two versions in flink repo
> >> > before
> >> > > > > they
> >> > > > > > > are
> >> > > > > > > > > > actually removed.
> >> > > > > > > > > > >
> >> > > > > > > > > > > Best,
> >> > > > > > > > > > > Leonard
> >> > > > > > > > > > >
> >> > > > > > > > > > > >
> >> > > > > > > > > > > > Best,
> >> > > > > > > > > > > > Jark
> >> > > > > > > > > > > >
> >> > > > > > > > > > > >
> >> > > > > > > > > > > > On Thu, 7 Sept 2023 at 13:51, Xuannan Su <
> >> > > > > > suxuanna...@gmail.com>
> >> > > > > > > > > > wrote:
> >> > > > > > > > > > > >
> >> > > > > > > > > > > >> Hi all,
> >> > > > > > > > > > > >>
> >> > > > > > > > > > > >> Thank you for all the reviews and suggestions.
> >> > > > > > > > > > > >>
> >> > > > > > > > > > > >> I believe all the comments have been addressed.
> If
> >> > there
> >> > > > are
> >> > > > > > no
> >> > > > > > > > > > > >> further comments, I plan to open the voting
> thread
> >> for
> >> > > > this
> >> > > > > > FLIP
> >> > > > > > > > > early
> >> > > > > > > > > > > >> next week.
> >> > > > > > > > > > > >>
> >> > > > > > > > > > > >> Best regards,
> >> > > > > > > > > > > >> Xuannan
> >> > > > > > > > > > > >>
> >> > > > > > > > > > > >> On Thu, Sep 7, 2023 at 12:09 AM Jing Ge
> >> > > > > > > > <j...@ververica.com.invalid
> >> > > > > > > > > >
> >> > > > > > > > > > > >> wrote:
> >> > > > > > > > > > > >>>
> >> > > > > > > > > > > >>> Hi Xuannan,
> >> > > > > > > > > > > >>>
> >> > > > > > > > > > > >>> I thought FLIP-328 will compete with FLIP-309
> >> while
> >> > > > setting
> >> > > > > > the
> >> > > > > > > > > > value of
> >> > > > > > > > > > > >>> the backlog. Understood. Thanks for the hint.
> >> > > > > > > > > > > >>>
> >> > > > > > > > > > > >>> Best regards,
> >> > > > > > > > > > > >>> Jing
> >> > > > > > > > > > > >>>
> >> > > > > > > > > > > >>> On Wed, Sep 6, 2023 at 12:12 PM Xuannan Su <
> >> > > > > > > > suxuanna...@gmail.com>
> >> > > > > > > > > > > >> wrote:
> >> > > > > > > > > > > >>>
> >> > > > > > > > > > > >>>> Hi Jing,
> >> > > > > > > > > > > >>>>
> >> > > > > > > > > > > >>>> Thank you for the clarification.
> >> > > > > > > > > > > >>>>
> >> > > > > > > > > > > >>>> For the use case you mentioned, I believe we
> can
> >> > > utilize
> >> > > > > the
> >> > > > > > > > > > > >>>> HybridSource, as updated in FLIP-309[1], to
> >> > determine
> >> > > > the
> >> > > > > > > > backlog
> >> > > > > > > > > > > >>>> status. For example, if the user wants to
> process
> >> > data
> >> > > > > > before
> >> > > > > > > > > time T
> >> > > > > > > > > > > >>>> in batch mode and after time T in stream mode,
> >> they
> >> > > can
> >> > > > > set
> >> > > > > > > the
> >> > > > > > > > > > first
> >> > > > > > > > > > > >>>> source of the HybridSource to read up to time T
> >> and
> >> > > the
> >> > > > > last
> >> > > > > > > > > source
> >> > > > > > > > > > of
> >> > > > > > > > > > > >>>> the HybridSource to read from time T.
> >> > > > > > > > > > > >>>>
> >> > > > > > > > > > > >>>> Best,
> >> > > > > > > > > > > >>>> Xuannan
> >> > > > > > > > > > > >>>>
> >> > > > > > > > > > > >>>> [1]
> >> > > > > > > > > > > >>>>
> >> > > > > > > > > > > >>
> >> > > > > > > > > >
> >> > > > > > > > >
> >> > > > > > > >
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-309%3A+Support+using+larger+checkpointing+interval+when+source+is+processing+backlog
> >> > > > > > > > > > > >>>>
> >> > > > > > > > > > > >>>>
> >> > > > > > > > > > > >>>> On Mon, Sep 4, 2023 at 10:36 PM Jing Ge
> >> > > > > > > > > <j...@ververica.com.invalid
> >> > > > > > > > > > >
> >> > > > > > > > > > > >>>> wrote:
> >> > > > > > > > > > > >>>>>
> >> > > > > > > > > > > >>>>> Hi Xuannan,
> >> > > > > > > > > > > >>>>>
> >> > > > > > > > > > > >>>>> Thanks for the clarification.
> >> > > > > > > > > > > >>>>>
> >> > > > > > > > > > > >>>>> 3. Event time and process time are two
> different
> >> > > > things.
> >> > > > > It
> >> > > > > > > > might
> >> > > > > > > > > > be
> >> > > > > > > > > > > >>>> rarely
> >> > > > > > > > > > > >>>>> used, but conceptually, users can process data
> >> in
> >> > the
> >> > > > > past
> >> > > > > > > > > within a
> >> > > > > > > > > > > >>>>> specific time range in the streaming mode. All
> >> data
> >> > > > > before
> >> > > > > > > that
> >> > > > > > > > > > range
> >> > > > > > > > > > > >>>> will
> >> > > > > > > > > > > >>>>> be considered as backlog and needed to be
> >> processed
> >> > > in
> >> > > > > the
> >> > > > > > > > batch
> >> > > > > > > > > > > >> mode,
> >> > > > > > > > > > > >>>>> like, e.g. the Present Perfect Progressive
> tense
> >> > used
> >> > > > in
> >> > > > > > > > English
> >> > > > > > > > > > > >>>> language.
> >> > > > > > > > > > > >>>>>
> >> > > > > > > > > > > >>>>> Best regards,
> >> > > > > > > > > > > >>>>> Jing
> >> > > > > > > > > > > >>>>>
> >> > > > > > > > > > > >>>>> On Thu, Aug 31, 2023 at 4:45 AM Xuannan Su <
> >> > > > > > > > > suxuanna...@gmail.com>
> >> > > > > > > > > > > >>>> wrote:
> >> > > > > > > > > > > >>>>>
> >> > > > > > > > > > > >>>>>> Hi Jing,
> >> > > > > > > > > > > >>>>>>
> >> > > > > > > > > > > >>>>>> Thanks for the reply.
> >> > > > > > > > > > > >>>>>>
> >> > > > > > > > > > > >>>>>> 1. You are absolutely right that the
> watermark
> >> lag
> >> > > > > > threshold
> >> > > > > > > > > must
> >> > > > > > > > > > > >> be
> >> > > > > > > > > > > >>>>>> carefully set with a thorough understanding
> of
> >> > > > watermark
> >> > > > > > > > > > > >> generation.
> >> > > > > > > > > > > >>>> It is
> >> > > > > > > > > > > >>>>>> crucial for users to take into account the
> >> > > > > > WatermarkStrategy
> >> > > > > > > > > when
> >> > > > > > > > > > > >>>> setting
> >> > > > > > > > > > > >>>>>> the watermark lag threshold.
> >> > > > > > > > > > > >>>>>>
> >> > > > > > > > > > > >>>>>> 2. Regarding pure processing-time based
> stream
> >> > > > > processing
> >> > > > > > > > jobs,
> >> > > > > > > > > > > >>>>>> alternative strategies will be implemented to
> >> > > > determine
> >> > > > > > > > whether
> >> > > > > > > > > > the
> >> > > > > > > > > > > >>>> job is
> >> > > > > > > > > > > >>>>>> processing backlog data. I have outlined two
> >> > > possible
> >> > > > > > > > strategies
> >> > > > > > > > > > > >> below:
> >> > > > > > > > > > > >>>>>>
> >> > > > > > > > > > > >>>>>> - Based on the source operator's state. For
> >> > example,
> >> > > > > when
> >> > > > > > > > MySQL
> >> > > > > > > > > > CDC
> >> > > > > > > > > > > >>>> source
> >> > > > > > > > > > > >>>>>> is reading snapshot, it can claim
> >> isBacklog=true.
> >> > > > > > > > > > > >>>>>> - Based on metrics. For example, when
> >> > > > > busyTimeMsPerSecond
> >> > > > > > > (or
> >> > > > > > > > > > > >>>>>> backPressuredTimeMsPerSecond) >
> >> > > > > user_specified_threshold,
> >> > > > > > > then
> >> > > > > > > > > > > >>>>>> isBacklog=true.
> >> > > > > > > > > > > >>>>>>
> >> > > > > > > > > > > >>>>>> As of the strategies proposed in this FLIP,
> it
> >> > rely
> >> > > on
> >> > > > > > > > generated
> >> > > > > > > > > > > >>>>>> watermarks. Therefore, if a user intends for
> >> the
> >> > job
> >> > > > to
> >> > > > > > > detect
> >> > > > > > > > > > > >> backlog
> >> > > > > > > > > > > >>>>>> status based on watermark, it is necessary to
> >> > > generate
> >> > > > > the
> >> > > > > > > > > > > >> watermark.
> >> > > > > > > > > > > >>>>>>
> >> > > > > > > > > > > >>>>>> 3. I'm afraid I'm not fully grasping your
> >> > question.
> >> > > > From
> >> > > > > > my
> >> > > > > > > > > > > >>>> understanding,
> >> > > > > > > > > > > >>>>>> it should work in both cases. When event
> times
> >> are
> >> > > > close
> >> > > > > > to
> >> > > > > > > > the
> >> > > > > > > > > > > >>>> processing
> >> > > > > > > > > > > >>>>>> time, resulting in watermarks close to the
> >> > > processing
> >> > > > > > time,
> >> > > > > > > > the
> >> > > > > > > > > > > >> job is
> >> > > > > > > > > > > >>>> not
> >> > > > > > > > > > > >>>>>> processing backlog data. On the other hand,
> >> when
> >> > > event
> >> > > > > > times
> >> > > > > > > > are
> >> > > > > > > > > > > >> far
> >> > > > > > > > > > > >>>> from
> >> > > > > > > > > > > >>>>>> processing time, causing watermarks to also
> be
> >> > > > distant,
> >> > > > > if
> >> > > > > > > the
> >> > > > > > > > > lag
> >> > > > > > > > > > > >>>>>> surpasses the defined threshold, the job is
> >> > > considered
> >> > > > > > > > > processing
> >> > > > > > > > > > > >>>> backlog
> >> > > > > > > > > > > >>>>>> data.
> >> > > > > > > > > > > >>>>>>
> >> > > > > > > > > > > >>>>>> Best,
> >> > > > > > > > > > > >>>>>> Xuannan
> >> > > > > > > > > > > >>>>>>
> >> > > > > > > > > > > >>>>>>
> >> > > > > > > > > > > >>>>>>> On Aug 31, 2023, at 02:56, Jing Ge
> >> > > > > > > > <j...@ververica.com.INVALID
> >> > > > > > > > > >
> >> > > > > > > > > > > >>>> wrote:
> >> > > > > > > > > > > >>>>>>>
> >> > > > > > > > > > > >>>>>>> Hi Xuannan,
> >> > > > > > > > > > > >>>>>>>
> >> > > > > > > > > > > >>>>>>> Thanks for the clarification. That is the
> part
> >> > > where
> >> > > > I
> >> > > > > am
> >> > > > > > > > > trying
> >> > > > > > > > > > > >> to
> >> > > > > > > > > > > >>>>>>> understand your thoughts. I have some
> >> follow-up
> >> > > > > > questions:
> >> > > > > > > > > > > >>>>>>>
> >> > > > > > > > > > > >>>>>>> 1. It depends strongly on the
> >> watermarkStrategy
> >> > and
> >> > > > how
> >> > > > > > > > > > > >> customized
> >> > > > > > > > > > > >>>>>>> watermark generation looks like. It mixes
> >> > business
> >> > > > > logic
> >> > > > > > > with
> >> > > > > > > > > > > >>>> technical
> >> > > > > > > > > > > >>>>>>> implementation and technical data processing
> >> > mode.
> >> > > > The
> >> > > > > > > value
> >> > > > > > > > of
> >> > > > > > > > > > > >> the
> >> > > > > > > > > > > >>>>>>> watermark lag threshold must be set very
> >> > carefully.
> >> > > > If
> >> > > > > > the
> >> > > > > > > > > value
> >> > > > > > > > > > > >> is
> >> > > > > > > > > > > >>>> too
> >> > > > > > > > > > > >>>>>>> small. any time, when the watermark
> generation
> >> > > logic
> >> > > > is
> >> > > > > > > > > > > >>>> changed(business
> >> > > > > > > > > > > >>>>>>> logic changes lead to the threshold getting
> >> > > > exceeded),
> >> > > > > > the
> >> > > > > > > > same
> >> > > > > > > > > > > >> job
> >> > > > > > > > > > > >>>> might
> >> > > > > > > > > > > >>>>>>> be running surprisingly in backlog
> processing
> >> > mode,
> >> > > > > i.e.
> >> > > > > > a
> >> > > > > > > > > > > >> butterfly
> >> > > > > > > > > > > >>>>>>> effect. A comprehensive documentation is
> >> required
> >> > > to
> >> > > > > > avoid
> >> > > > > > > > any
> >> > > > > > > > > > > >>>> confusion
> >> > > > > > > > > > > >>>>>>> for the users.
> >> > > > > > > > > > > >>>>>>> 2. Like Jark already mentioned, use cases
> >> that do
> >> > > not
> >> > > > > > have
> >> > > > > > > > > > > >>>> watermarks,
> >> > > > > > > > > > > >>>>>>> like pure processing-time based stream
> >> > > processing[1]
> >> > > > > are
> >> > > > > > > not
> >> > > > > > > > > > > >>>> covered. It
> >> > > > > > > > > > > >>>>>> is
> >> > > > > > > > > > > >>>>>>> more or less a trade-off solution that does
> >> not
> >> > > > support
> >> > > > > > > such
> >> > > > > > > > > use
> >> > > > > > > > > > > >>>> cases
> >> > > > > > > > > > > >>>>>> and
> >> > > > > > > > > > > >>>>>>> appropriate documentation is required.
> Forcing
> >> > them
> >> > > > to
> >> > > > > > > > > explicitly
> >> > > > > > > > > > > >>>>>> generate
> >> > > > > > > > > > > >>>>>>> watermarks that are never needed just
> because
> >> of
> >> > > this
> >> > > > > > does
> >> > > > > > > > not
> >> > > > > > > > > > > >> sound
> >> > > > > > > > > > > >>>>>> like a
> >> > > > > > > > > > > >>>>>>> proper solution.
> >> > > > > > > > > > > >>>>>>> 3. If I am not mistaken, it only works for
> use
> >> > > cases
> >> > > > > > where
> >> > > > > > > > > event
> >> > > > > > > > > > > >>>> times
> >> > > > > > > > > > > >>>>>> are
> >> > > > > > > > > > > >>>>>>> very close to the processing times, because
> >> the
> >> > > wall
> >> > > > > > clock
> >> > > > > > > is
> >> > > > > > > > > > > >> used to
> >> > > > > > > > > > > >>>>>>> calculate the watermark lag and the
> watermark
> >> is
> >> > > > > > generated
> >> > > > > > > > > based
> >> > > > > > > > > > > >> on
> >> > > > > > > > > > > >>>> the
> >> > > > > > > > > > > >>>>>>> event time.
> >> > > > > > > > > > > >>>>>>>
> >> > > > > > > > > > > >>>>>>> Best regards,
> >> > > > > > > > > > > >>>>>>> Jing
> >> > > > > > > > > > > >>>>>>>
> >> > > > > > > > > > > >>>>>>> [1]
> >> > > > > > > > > > > >>>>>>>
> >> > > > > > > > > > > >>>>>>
> >> > > > > > > > > > > >>>>
> >> > > > > > > > > > > >>
> >> > > > > > > > > >
> >> > > > > > > > >
> >> > > > > > > >
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> https://github.com/apache/flink/blob/2c50b4e956305426f478b726d4de4a640a16b810/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkStrategy.java#L236
> >> > > > > > > > > > > >>>>>>>
> >> > > > > > > > > > > >>>>>>> On Wed, Aug 30, 2023 at 4:06 AM Xuannan Su <
> >> > > > > > > > > > > >> suxuanna...@gmail.com>
> >> > > > > > > > > > > >>>>>> wrote:
> >> > > > > > > > > > > >>>>>>>
> >> > > > > > > > > > > >>>>>>>> Hi Jing,
> >> > > > > > > > > > > >>>>>>>>
> >> > > > > > > > > > > >>>>>>>> Thank you for the suggestion.
> >> > > > > > > > > > > >>>>>>>>
> >> > > > > > > > > > > >>>>>>>> The definition of watermark lag is the same
> >> as
> >> > the
> >> > > > > > > > > watermarkLag
> >> > > > > > > > > > > >>>> metric
> >> > > > > > > > > > > >>>>>> in
> >> > > > > > > > > > > >>>>>>>> FLIP-33[1]. More specifically, the
> watermark
> >> lag
> >> > > > > > > calculation
> >> > > > > > > > > is
> >> > > > > > > > > > > >>>>>> computed at
> >> > > > > > > > > > > >>>>>>>> the time when a watermark is emitted
> >> downstream
> >> > in
> >> > > > the
> >> > > > > > > > > following
> >> > > > > > > > > > > >>>> way:
> >> > > > > > > > > > > >>>>>>>> watermarkLag = CurrentTime - Watermark. I
> >> have
> >> > > added
> >> > > > > > this
> >> > > > > > > > > > > >>>> description to
> >> > > > > > > > > > > >>>>>>>> the FLIP.
> >> > > > > > > > > > > >>>>>>>>
> >> > > > > > > > > > > >>>>>>>> I hope this addresses your concern.
> >> > > > > > > > > > > >>>>>>>>
> >> > > > > > > > > > > >>>>>>>> Best,
> >> > > > > > > > > > > >>>>>>>> Xuannan
> >> > > > > > > > > > > >>>>>>>>
> >> > > > > > > > > > > >>>>>>>> [1]
> >> > > > > > > > > > > >>>>>>>>
> >> > > > > > > > > > > >>>>>>
> >> > > > > > > > > > > >>>>
> >> > > > > > > > > > > >>
> >> > > > > > > > > >
> >> > > > > > > > >
> >> > > > > > > >
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics
> >> > > > > > > > > > > >>>>>>>>
> >> > > > > > > > > > > >>>>>>>>
> >> > > > > > > > > > > >>>>>>>>> On Aug 28, 2023, at 01:04, Jing Ge
> >> > > > > > > > > <j...@ververica.com.INVALID
> >> > > > > > > > > > > >>>
> >> > > > > > > > > > > >>>> wrote:
> >> > > > > > > > > > > >>>>>>>>>
> >> > > > > > > > > > > >>>>>>>>> Hi Xuannan,
> >> > > > > > > > > > > >>>>>>>>>
> >> > > > > > > > > > > >>>>>>>>> Thanks for the proposal. +1 for me.
> >> > > > > > > > > > > >>>>>>>>>
> >> > > > > > > > > > > >>>>>>>>> There is one tiny thing that I am not sure
> >> if I
> >> > > > > > > understand
> >> > > > > > > > it
> >> > > > > > > > > > > >>>>>> correctly.
> >> > > > > > > > > > > >>>>>>>>> Since there will be many different
> >> > > > > WatermarkStrategies
> >> > > > > > > and
> >> > > > > > > > > > > >>>> different
> >> > > > > > > > > > > >>>>>>>>> WatermarkGenerators. Could you please
> update
> >> > the
> >> > > > FLIP
> >> > > > > > and
> >> > > > > > > > add
> >> > > > > > > > > > > >> the
> >> > > > > > > > > > > >>>>>>>>> description of how the watermark lag is
> >> > > calculated
> >> > > > > > > exactly?
> >> > > > > > > > > > > >> E.g.
> >> > > > > > > > > > > >>>>>>>> Watermark
> >> > > > > > > > > > > >>>>>>>>> lag = A - B with A is the timestamp of the
> >> > > > watermark
> >> > > > > > > > emitted
> >> > > > > > > > > > > >> to the
> >> > > > > > > > > > > >>>>>>>>> downstream and B is....(this is the part I
> >> am
> >> > not
> >> > > > > > really
> >> > > > > > > > sure
> >> > > > > > > > > > > >> after
> >> > > > > > > > > > > >>>>>>>> reading
> >> > > > > > > > > > > >>>>>>>>> the FLIP).
> >> > > > > > > > > > > >>>>>>>>>
> >> > > > > > > > > > > >>>>>>>>> Best regards,
> >> > > > > > > > > > > >>>>>>>>> Jing
> >> > > > > > > > > > > >>>>>>>>>
> >> > > > > > > > > > > >>>>>>>>>
> >> > > > > > > > > > > >>>>>>>>> On Mon, Aug 21, 2023 at 9:03 AM Xuannan
> Su <
> >> > > > > > > > > > > >> suxuanna...@gmail.com>
> >> > > > > > > > > > > >>>>>>>> wrote:
> >> > > > > > > > > > > >>>>>>>>>
> >> > > > > > > > > > > >>>>>>>>>> Hi Jark,
> >> > > > > > > > > > > >>>>>>>>>>
> >> > > > > > > > > > > >>>>>>>>>> Thanks for the comments.
> >> > > > > > > > > > > >>>>>>>>>>
> >> > > > > > > > > > > >>>>>>>>>> I agree that the current solution cannot
> >> > support
> >> > > > > jobs
> >> > > > > > > that
> >> > > > > > > > > > > >> cannot
> >> > > > > > > > > > > >>>>>> define
> >> > > > > > > > > > > >>>>>>>>>> watermarks. However, after considering
> the
> >> > > > > > > > > > > >> pending-record-based
> >> > > > > > > > > > > >>>>>>>> solution, I
> >> > > > > > > > > > > >>>>>>>>>> believe the current solution is superior
> >> for
> >> > the
> >> > > > > > target
> >> > > > > > > > use
> >> > > > > > > > > > > >> case
> >> > > > > > > > > > > >>>> as it
> >> > > > > > > > > > > >>>>>>>> is
> >> > > > > > > > > > > >>>>>>>>>> more intuitive for users. The backlog
> >> status
> >> > > gives
> >> > > > > > users
> >> > > > > > > > the
> >> > > > > > > > > > > >>>> ability
> >> > > > > > > > > > > >>>>>> to
> >> > > > > > > > > > > >>>>>>>>>> balance between throughput and latency.
> >> Making
> >> > > > this
> >> > > > > > > > > trade-off
> >> > > > > > > > > > > >>>> decision
> >> > > > > > > > > > > >>>>>>>>>> based on the watermark lag is more
> >> intuitive
> >> > > from
> >> > > > > the
> >> > > > > > > > user's
> >> > > > > > > > > > > >>>>>>>> perspective.
> >> > > > > > > > > > > >>>>>>>>>> For instance, a user can decide that if
> the
> >> > job
> >> > > > lags
> >> > > > > > > > behind
> >> > > > > > > > > > > >> the
> >> > > > > > > > > > > >>>>>> current
> >> > > > > > > > > > > >>>>>>>>>> time by more than 1 hour, the result is
> not
> >> > > > usable.
> >> > > > > In
> >> > > > > > > > that
> >> > > > > > > > > > > >> case,
> >> > > > > > > > > > > >>>> we
> >> > > > > > > > > > > >>>>>> can
> >> > > > > > > > > > > >>>>>>>>>> optimize for throughput when the data
> lags
> >> > > behind
> >> > > > by
> >> > > > > > > more
> >> > > > > > > > > > > >> than an
> >> > > > > > > > > > > >>>>>> hour.
> >> > > > > > > > > > > >>>>>>>>>> With the pending-record-based solution,
> >> it's
> >> > > > > > challenging
> >> > > > > > > > for
> >> > > > > > > > > > > >>>> users to
> >> > > > > > > > > > > >>>>>>>>>> determine when to optimize for throughput
> >> and
> >> > > when
> >> > > > > to
> >> > > > > > > > > > > >> prioritize
> >> > > > > > > > > > > >>>>>>>> latency.
> >> > > > > > > > > > > >>>>>>>>>>
> >> > > > > > > > > > > >>>>>>>>>> Regarding the limitations of the
> >> > watermark-based
> >> > > > > > > solution:
> >> > > > > > > > > > > >>>>>>>>>>
> >> > > > > > > > > > > >>>>>>>>>> 1. The current solution can support jobs
> >> with
> >> > > > > sources
> >> > > > > > > that
> >> > > > > > > > > > > >> have
> >> > > > > > > > > > > >>>> event
> >> > > > > > > > > > > >>>>>>>>>> time. Users can always define a watermark
> >> at
> >> > the
> >> > > > > > source
> >> > > > > > > > > > > >> operator,
> >> > > > > > > > > > > >>>> even
> >> > > > > > > > > > > >>>>>>>> if
> >> > > > > > > > > > > >>>>>>>>>> it's not used by downstream operators,
> >> such as
> >> > > > > > streaming
> >> > > > > > > > > join
> >> > > > > > > > > > > >> and
> >> > > > > > > > > > > >>>>>>>> unbounded
> >> > > > > > > > > > > >>>>>>>>>> aggregate.
> >> > > > > > > > > > > >>>>>>>>>>
> >> > > > > > > > > > > >>>>>>>>>> 2.I don't believe it's accurate to say
> that
> >> > the
> >> > > > > > > watermark
> >> > > > > > > > > lag
> >> > > > > > > > > > > >> will
> >> > > > > > > > > > > >>>>>> keep
> >> > > > > > > > > > > >>>>>>>>>> increasing if no data is generated in
> >> Kafka.
> >> > The
> >> > > > > > > watermark
> >> > > > > > > > > > > >> lag and
> >> > > > > > > > > > > >>>>>>>> backlog
> >> > > > > > > > > > > >>>>>>>>>> status are determined at the moment when
> >> the
> >> > > > > watermark
> >> > > > > > > is
> >> > > > > > > > > > > >> emitted
> >> > > > > > > > > > > >>>> to
> >> > > > > > > > > > > >>>>>> the
> >> > > > > > > > > > > >>>>>>>>>> downstream operator. If no data is
> emitted
> >> > from
> >> > > > the
> >> > > > > > > > source,
> >> > > > > > > > > > > >> the
> >> > > > > > > > > > > >>>>>>>> watermark
> >> > > > > > > > > > > >>>>>>>>>> lag and backlog status will not be
> >> updated. If
> >> > > the
> >> > > > > > > > > > > >>>> WatermarkStrategy
> >> > > > > > > > > > > >>>>>>>> with
> >> > > > > > > > > > > >>>>>>>>>> idleness is used, the source becomes
> >> > non-backlog
> >> > > > > when
> >> > > > > > it
> >> > > > > > > > > > > >> becomes
> >> > > > > > > > > > > >>>> idle.
> >> > > > > > > > > > > >>>>>>>>>>
> >> > > > > > > > > > > >>>>>>>>>> 3. I think watermark lag is more
> intuitive
> >> to
> >> > > > > > determine
> >> > > > > > > > if a
> >> > > > > > > > > > > >> job
> >> > > > > > > > > > > >>>> is
> >> > > > > > > > > > > >>>>>>>>>> processing backlog data. Even when using
> >> > pending
> >> > > > > > > records,
> >> > > > > > > > it
> >> > > > > > > > > > > >>>> faces a
> >> > > > > > > > > > > >>>>>>>>>> similar issue. For example, if the source
> >> has
> >> > 1K
> >> > > > > > pending
> >> > > > > > > > > > > >> records,
> >> > > > > > > > > > > >>>>>> those
> >> > > > > > > > > > > >>>>>>>>>> records can span from 1 day  to 1 hour
> to 1
> >> > > > second.
> >> > > > > If
> >> > > > > > > the
> >> > > > > > > > > > > >> records
> >> > > > > > > > > > > >>>>>> span
> >> > > > > > > > > > > >>>>>>>> 1
> >> > > > > > > > > > > >>>>>>>>>> day, it's probably best to optimize for
> >> > > > throughput.
> >> > > > > If
> >> > > > > > > > they
> >> > > > > > > > > > > >> span 1
> >> > > > > > > > > > > >>>>>>>> hour, it
> >> > > > > > > > > > > >>>>>>>>>> depends on the business logic. If they
> >> span 1
> >> > > > > second,
> >> > > > > > > > > > > >> optimizing
> >> > > > > > > > > > > >>>> for
> >> > > > > > > > > > > >>>>>>>>>> latency is likely the better choice.
> >> > > > > > > > > > > >>>>>>>>>>
> >> > > > > > > > > > > >>>>>>>>>> In summary, I believe the watermark-based
> >> > > solution
> >> > > > > is
> >> > > > > > a
> >> > > > > > > > > > > >> superior
> >> > > > > > > > > > > >>>>>> choice
> >> > > > > > > > > > > >>>>>>>>>> for the target use case where
> >> watermark/event
> >> > > time
> >> > > > > can
> >> > > > > > > be
> >> > > > > > > > > > > >> defined.
> >> > > > > > > > > > > >>>>>>>>>> Additionally, I haven't come across a
> >> scenario
> >> > > > that
> >> > > > > > > > requires
> >> > > > > > > > > > > >>>>>> low-latency
> >> > > > > > > > > > > >>>>>>>>>> processing and reads from a source that
> >> cannot
> >> > > > > define
> >> > > > > > > > > > > >> watermarks.
> >> > > > > > > > > > > >>>> If
> >> > > > > > > > > > > >>>>>> we
> >> > > > > > > > > > > >>>>>>>>>> encounter such a use case, we can create
> >> > another
> >> > > > > FLIP
> >> > > > > > to
> >> > > > > > > > > > > >> address
> >> > > > > > > > > > > >>>> those
> >> > > > > > > > > > > >>>>>>>>>> needs in the future. What do you think?
> >> > > > > > > > > > > >>>>>>>>>>
> >> > > > > > > > > > > >>>>>>>>>>
> >> > > > > > > > > > > >>>>>>>>>> Best,
> >> > > > > > > > > > > >>>>>>>>>> Xuannan
> >> > > > > > > > > > > >>>>>>>>>>
> >> > > > > > > > > > > >>>>>>>>>>
> >> > > > > > > > > > > >>>>>>>>>>
> >> > > > > > > > > > > >>>>>>>>>>> On Aug 20, 2023, at 23:27, Jark Wu <
> >> > > > > imj...@gmail.com
> >> > > > > > > > > > > >> <mailto:
> >> > > > > > > > > > > >>>>>>>>>> imj...@gmail.com>> wrote:
> >> > > > > > > > > > > >>>>>>>>>>>
> >> > > > > > > > > > > >>>>>>>>>>> Hi Xuannan,
> >> > > > > > > > > > > >>>>>>>>>>>
> >> > > > > > > > > > > >>>>>>>>>>> Thanks for opening this discussion.
> >> > > > > > > > > > > >>>>>>>>>>>
> >> > > > > > > > > > > >>>>>>>>>>> This current proposal may work in the
> >> > mentioned
> >> > > > > > > watermark
> >> > > > > > > > > > > >> cases.
> >> > > > > > > > > > > >>>>>>>>>>> However, it seems this is not a general
> >> > > solution
> >> > > > > for
> >> > > > > > > > > sources
> >> > > > > > > > > > > >> to
> >> > > > > > > > > > > >>>>>>>> determine
> >> > > > > > > > > > > >>>>>>>>>>> "isProcessingBacklog".
> >> > > > > > > > > > > >>>>>>>>>>> From my point of view, there are 3
> >> > limitations
> >> > > of
> >> > > > > the
> >> > > > > > > > > current
> >> > > > > > > > > > > >>>>>> proposal:
> >> > > > > > > > > > > >>>>>>>>>>> 1. It doesn't cover jobs that don't have
> >> > > > > > > > > watermark/event-time
> >> > > > > > > > > > > >>>>>> defined,
> >> > > > > > > > > > > >>>>>>>>>>> for example streaming join and unbounded
> >> > > > aggregate.
> >> > > > > > We
> >> > > > > > > > may
> >> > > > > > > > > > > >> still
> >> > > > > > > > > > > >>>> need
> >> > > > > > > > > > > >>>>>>>> to
> >> > > > > > > > > > > >>>>>>>>>>> figure out solutions for them.
> >> > > > > > > > > > > >>>>>>>>>>> 2. Watermark lag can not be trusted,
> >> because
> >> > it
> >> > > > > > > increases
> >> > > > > > > > > > > >>>> unlimited
> >> > > > > > > > > > > >>>>>> if
> >> > > > > > > > > > > >>>>>>>> no
> >> > > > > > > > > > > >>>>>>>>>>> data is generated in the Kafka.
> >> > > > > > > > > > > >>>>>>>>>>> But in this case, there is no backlog at
> >> all.
> >> > > > > > > > > > > >>>>>>>>>>> 3. Watermark lag is hard to reflect the
> >> > amount
> >> > > of
> >> > > > > > > > backlog.
> >> > > > > > > > > > > >> If the
> >> > > > > > > > > > > >>>>>>>>>> watermark
> >> > > > > > > > > > > >>>>>>>>>>> lag is 1day or 1 hour or 1second,
> >> > > > > > > > > > > >>>>>>>>>>> there is possibly only 1 pending record
> >> > there,
> >> > > > > which
> >> > > > > > > > means
> >> > > > > > > > > no
> >> > > > > > > > > > > >>>> backlog
> >> > > > > > > > > > > >>>>>>>> at
> >> > > > > > > > > > > >>>>>>>>>>> all.
> >> > > > > > > > > > > >>>>>>>>>>>
> >> > > > > > > > > > > >>>>>>>>>>> Therefore, IMO, watermark maybe not the
> >> ideal
> >> > > > > metric
> >> > > > > > > used
> >> > > > > > > > > to
> >> > > > > > > > > > > >>>>>> determine
> >> > > > > > > > > > > >>>>>>>>>>> "isProcessingBacklog".
> >> > > > > > > > > > > >>>>>>>>>>> What we need is something that reflects
> >> the
> >> > > > number
> >> > > > > of
> >> > > > > > > > > records
> >> > > > > > > > > > > >>>>>>>> unprocessed
> >> > > > > > > > > > > >>>>>>>>>>> by the job.
> >> > > > > > > > > > > >>>>>>>>>>> Actually, that is the "pendingRecords"
> >> metric
> >> > > > > > proposed
> >> > > > > > > in
> >> > > > > > > > > > > >>>> FLIP-33 and
> >> > > > > > > > > > > >>>>>>>> has
> >> > > > > > > > > > > >>>>>>>>>>> been implemented by Kafka source.
> >> > > > > > > > > > > >>>>>>>>>>> Did you consider using "pendingRecords"
> >> > metric
> >> > > to
> >> > > > > > > > determine
> >> > > > > > > > > > > >>>>>>>>>>> "isProcessingBacklog"?
> >> > > > > > > > > > > >>>>>>>>>>>
> >> > > > > > > > > > > >>>>>>>>>>> Best,
> >> > > > > > > > > > > >>>>>>>>>>> Jark
> >> > > > > > > > > > > >>>>>>>>>>>
> >> > > > > > > > > > > >>>>>>>>>>>
> >> > > > > > > > > > > >>>>>>>>>>> [1]
> >> > > > > > > > > > > >>>>>>>>>>>
> >> > > > > > > > > > > >>>>>>>>>>
> >> > > > > > > > > > > >>>>>>>>
> >> > > > > > > > > > > >>>>>>
> >> > > > > > > > > > > >>>>
> >> > > > > > > > > > > >>
> >> > > > > > > > > >
> >> > > > > > > > >
> >> > > > > > > >
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics
> >> > > > > > > > > > > >>>>>>>>>> <
> >> > > > > > > > > > > >>>>>>>>>>
> >> > > > > > > > > > > >>>>>>>>
> >> > > > > > > > > > > >>>>>>
> >> > > > > > > > > > > >>>>
> >> > > > > > > > > > > >>
> >> > > > > > > > > >
> >> > > > > > > > >
> >> > > > > > > >
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics
> >> > > > > > > > > > > >>>>>>>>>>>
> >> > > > > > > > > > > >>>>>>>>>>>
> >> > > > > > > > > > > >>>>>>>>>>>
> >> > > > > > > > > > > >>>>>>>>>>>
> >> > > > > > > > > > > >>>>>>>>>>> On Tue, 15 Aug 2023 at 12:04, Xintong
> >> Song <
> >> > > > > > > > > > > >>>> tonysong...@gmail.com
> >> > > > > > > > > > > >>>>>>>>>> <mailto:tonysong...@gmail.com>> wrote:
> >> > > > > > > > > > > >>>>>>>>>>>
> >> > > > > > > > > > > >>>>>>>>>>>> Sounds good to me.
> >> > > > > > > > > > > >>>>>>>>>>>>
> >> > > > > > > > > > > >>>>>>>>>>>> It is true that, if we are introducing
> >> the
> >> > > > > > generalized
> >> > > > > > > > > > > >>>> watermark,
> >> > > > > > > > > > > >>>>>>>> there
> >> > > > > > > > > > > >>>>>>>>>>>> will be other watermark related
> concepts
> >> /
> >> > > > > > > > configurations
> >> > > > > > > > > > > >> that
> >> > > > > > > > > > > >>>> need
> >> > > > > > > > > > > >>>>>> to
> >> > > > > > > > > > > >>>>>>>>>> be
> >> > > > > > > > > > > >>>>>>>>>>>> updated anyway.
> >> > > > > > > > > > > >>>>>>>>>>>>
> >> > > > > > > > > > > >>>>>>>>>>>>
> >> > > > > > > > > > > >>>>>>>>>>>> Best,
> >> > > > > > > > > > > >>>>>>>>>>>>
> >> > > > > > > > > > > >>>>>>>>>>>> Xintong
> >> > > > > > > > > > > >>>>>>>>>>>>
> >> > > > > > > > > > > >>>>>>>>>>>>
> >> > > > > > > > > > > >>>>>>>>>>>>
> >> > > > > > > > > > > >>>>>>>>>>>> On Tue, Aug 15, 2023 at 11:30 AM
> Xuannan
> >> Su
> >> > <
> >> > > > > > > > > > > >>>> suxuanna...@gmail.com
> >> > > > > > > > > > > >>>>>>>>>> <mailto:suxuanna...@gmail.com>> wrote:
> >> > > > > > > > > > > >>>>>>>>>>>>
> >> > > > > > > > > > > >>>>>>>>>>>>> Hi Xingtong,
> >> > > > > > > > > > > >>>>>>>>>>>>>
> >> > > > > > > > > > > >>>>>>>>>>>>> Thank you for your suggestion.
> >> > > > > > > > > > > >>>>>>>>>>>>>
> >> > > > > > > > > > > >>>>>>>>>>>>> After considering the idea of using a
> >> > general
> >> > > > > > > > > configuration
> >> > > > > > > > > > > >>>> key, I
> >> > > > > > > > > > > >>>>>>>>>> think
> >> > > > > > > > > > > >>>>>>>>>>>>> it may not be a good idea for the
> >> reasons
> >> > > > below.
> >> > > > > > > > > > > >>>>>>>>>>>>>
> >> > > > > > > > > > > >>>>>>>>>>>>> While I agree that using a more
> general
> >> > > > > > configuration
> >> > > > > > > > key
> >> > > > > > > > > > > >>>> provides
> >> > > > > > > > > > > >>>>>> us
> >> > > > > > > > > > > >>>>>>>>>>>> with
> >> > > > > > > > > > > >>>>>>>>>>>>> the flexibility to switch to other
> >> > approaches
> >> > > > to
> >> > > > > > > > > calculate
> >> > > > > > > > > > > >> the
> >> > > > > > > > > > > >>>> lag
> >> > > > > > > > > > > >>>>>> in
> >> > > > > > > > > > > >>>>>>>>>> the
> >> > > > > > > > > > > >>>>>>>>>>>>> future, the downside is that it may
> >> cause
> >> > > > > confusion
> >> > > > > > > for
> >> > > > > > > > > > > >> users.
> >> > > > > > > > > > > >>>> We
> >> > > > > > > > > > > >>>>>>>>>>>> currently
> >> > > > > > > > > > > >>>>>>>>>>>>> have fetchEventTimeLag,
> >> emitEventTimeLag,
> >> > and
> >> > > > > > > > > watermarkLag
> >> > > > > > > > > > > >> in
> >> > > > > > > > > > > >>>> the
> >> > > > > > > > > > > >>>>>>>>>> source,
> >> > > > > > > > > > > >>>>>>>>>>>>> and it is not clear which specific lag
> >> we
> >> > are
> >> > > > > > > referring
> >> > > > > > > > > to.
> >> > > > > > > > > > > >>>> With
> >> > > > > > > > > > > >>>>>> the
> >> > > > > > > > > > > >>>>>>>>>>>>> potential introduction of the
> >> Generalized
> >> > > > > Watermark
> >> > > > > > > > > > > >> mechanism
> >> > > > > > > > > > > >>>> in
> >> > > > > > > > > > > >>>>>> the
> >> > > > > > > > > > > >>>>>>>>>>>>> future, if I understand correctly, a
> >> > > watermark
> >> > > > > > won't
> >> > > > > > > > > > > >>>> necessarily
> >> > > > > > > > > > > >>>>>> need
> >> > > > > > > > > > > >>>>>>>>>> to
> >> > > > > > > > > > > >>>>>>>>>>>> be
> >> > > > > > > > > > > >>>>>>>>>>>>> a timestamp. I am concern that the
> >> general
> >> > > > > > > > configuration
> >> > > > > > > > > > > >> key
> >> > > > > > > > > > > >>>> may
> >> > > > > > > > > > > >>>>>> not
> >> > > > > > > > > > > >>>>>>>>>> be
> >> > > > > > > > > > > >>>>>>>>>>>>> enough to cover all the use case and
> we
> >> > will
> >> > > > need
> >> > > > > > to
> >> > > > > > > > > > > >> introduce
> >> > > > > > > > > > > >>>> a
> >> > > > > > > > > > > >>>>>>>>>> general
> >> > > > > > > > > > > >>>>>>>>>>>>> way to determine the backlog status
> >> > > regardless.
> >> > > > > > > > > > > >>>>>>>>>>>>>
> >> > > > > > > > > > > >>>>>>>>>>>>> For the reasons above, I prefer
> >> introducing
> >> > > the
> >> > > > > > > > > > > >> configuration
> >> > > > > > > > > > > >>>> as
> >> > > > > > > > > > > >>>>>> is,
> >> > > > > > > > > > > >>>>>>>>>> and
> >> > > > > > > > > > > >>>>>>>>>>>>> change it later with the a deprecation
> >> > > process
> >> > > > or
> >> > > > > > > > > migration
> >> > > > > > > > > > > >>>>>> process.
> >> > > > > > > > > > > >>>>>>>>>> What
> >> > > > > > > > > > > >>>>>>>>>>>>> do you think?
> >> > > > > > > > > > > >>>>>>>>>>>>>
> >> > > > > > > > > > > >>>>>>>>>>>>> Best,
> >> > > > > > > > > > > >>>>>>>>>>>>> Xuannan
> >> > > > > > > > > > > >>>>>>>>>>>>> On Aug 14, 2023, 14:09 +0800, Xintong
> >> Song
> >> > <
> >> > > > > > > > > > > >>>> tonysong...@gmail.com
> >> > > > > > > > > > > >>>>>>>>>> <mailto:tonysong...@gmail.com>>,
> >> > > > > > > > > > > >>>>>>>>>>>> wrote:
> >> > > > > > > > > > > >>>>>>>>>>>>>> Thanks for the explanation.
> >> > > > > > > > > > > >>>>>>>>>>>>>>
> >> > > > > > > > > > > >>>>>>>>>>>>>> I wonder if it makes sense to not
> >> expose
> >> > > this
> >> > > > > > detail
> >> > > > > > > > via
> >> > > > > > > > > > > >> the
> >> > > > > > > > > > > >>>>>>>>>>>>> configuration
> >> > > > > > > > > > > >>>>>>>>>>>>>> option. To be specific, I suggest not
> >> > > > mentioning
> >> > > > > > the
> >> > > > > > > > > > > >>>> "watermark"
> >> > > > > > > > > > > >>>>>>>>>>>> keyword
> >> > > > > > > > > > > >>>>>>>>>>>>> in
> >> > > > > > > > > > > >>>>>>>>>>>>>> the configuration key and
> description.
> >> > > > > > > > > > > >>>>>>>>>>>>>>
> >> > > > > > > > > > > >>>>>>>>>>>>>> - From the users' perspective, I
> think
> >> > they
> >> > > > only
> >> > > > > > > need
> >> > > > > > > > to
> >> > > > > > > > > > > >> know
> >> > > > > > > > > > > >>>>>>>> there's
> >> > > > > > > > > > > >>>>>>>>>> a
> >> > > > > > > > > > > >>>>>>>>>>>>>> lag higher than the given threshold,
> >> Flink
> >> > > > will
> >> > > > > > > > consider
> >> > > > > > > > > > > >>>> latency
> >> > > > > > > > > > > >>>>>> of
> >> > > > > > > > > > > >>>>>>>>>>>>>> individual records as less important
> >> and
> >> > > > > > prioritize
> >> > > > > > > > > > > >> throughput
> >> > > > > > > > > > > >>>>>> over
> >> > > > > > > > > > > >>>>>>>>>> it.
> >> > > > > > > > > > > >>>>>>>>>>>>>> They don't really need the details of
> >> how
> >> > > the
> >> > > > > lags
> >> > > > > > > are
> >> > > > > > > > > > > >>>> calculated.
> >> > > > > > > > > > > >>>>>>>>>>>>>> - For the internal implementation, I
> >> also
> >> > > > think
> >> > > > > > > using
> >> > > > > > > > > > > >>>> watermark
> >> > > > > > > > > > > >>>>>> lags
> >> > > > > > > > > > > >>>>>>>>>> is
> >> > > > > > > > > > > >>>>>>>>>>>>>> a good idea, for the reasons you've
> >> > already
> >> > > > > > > mentioned.
> >> > > > > > > > > > > >>>> However,
> >> > > > > > > > > > > >>>>>> it's
> >> > > > > > > > > > > >>>>>>>>>>>> not
> >> > > > > > > > > > > >>>>>>>>>>>>>> the only possible option. Hiding this
> >> > detail
> >> > > > > from
> >> > > > > > > > users
> >> > > > > > > > > > > >> would
> >> > > > > > > > > > > >>>> give
> >> > > > > > > > > > > >>>>>>>> us
> >> > > > > > > > > > > >>>>>>>>>>>> the
> >> > > > > > > > > > > >>>>>>>>>>>>>> flexibility to switch to other
> >> approaches
> >> > if
> >> > > > > > needed
> >> > > > > > > in
> >> > > > > > > > > > > >> future.
> >> > > > > > > > > > > >>>>>>>>>>>>>> - We are currently working on
> designing
> >> > the
> >> > > > > > > > > > > >> ProcessFunction
> >> > > > > > > > > > > >>>> API
> >> > > > > > > > > > > >>>>>>>>>>>>>> (consider it as a DataStream API V2).
> >> > > There's
> >> > > > an
> >> > > > > > > idea
> >> > > > > > > > to
> >> > > > > > > > > > > >>>>>> introduce a
> >> > > > > > > > > > > >>>>>>>>>>>>>> Generalized Watermark mechanism,
> where
> >> > > > basically
> >> > > > > > the
> >> > > > > > > > > > > >>>> watermark can
> >> > > > > > > > > > > >>>>>>>> be
> >> > > > > > > > > > > >>>>>>>>>>>>>> anything that needs to travel along
> the
> >> > > > > data-flow
> >> > > > > > > with
> >> > > > > > > > > > > >> certain
> >> > > > > > > > > > > >>>>>>>>>>>> alignment
> >> > > > > > > > > > > >>>>>>>>>>>>>> strategies, and event time watermark
> >> would
> >> > > be
> >> > > > > one
> >> > > > > > > > > specific
> >> > > > > > > > > > > >>>> case of
> >> > > > > > > > > > > >>>>>>>> it.
> >> > > > > > > > > > > >>>>>>>>>>>>> This
> >> > > > > > > > > > > >>>>>>>>>>>>>> is still an idea and has not been
> >> > discussed
> >> > > > and
> >> > > > > > > agreed
> >> > > > > > > > > on
> >> > > > > > > > > > > >> by
> >> > > > > > > > > > > >>>> the
> >> > > > > > > > > > > >>>>>>>>>>>>> community,
> >> > > > > > > > > > > >>>>>>>>>>>>>> and we are preparing a FLIP for it.
> >> But if
> >> > > we
> >> > > > > are
> >> > > > > > > > going
> >> > > > > > > > > > > >> for
> >> > > > > > > > > > > >>>> it,
> >> > > > > > > > > > > >>>>>> the
> >> > > > > > > > > > > >>>>>>>>>>>>> concept
> >> > > > > > > > > > > >>>>>>>>>>>>>> "watermark-lag-threshold" could be
> >> > > ambiguous.
> >> > > > > > > > > > > >>>>>>>>>>>>>>
> >> > > > > > > > > > > >>>>>>>>>>>>>> I do not intend to block the FLIP on
> >> this.
> >> > > I'd
> >> > > > > > also
> >> > > > > > > be
> >> > > > > > > > > > > >> fine
> >> > > > > > > > > > > >>>> with
> >> > > > > > > > > > > >>>>>>>>>>>>>> introducing the configuration as is,
> >> and
> >> > > > > changing
> >> > > > > > it
> >> > > > > > > > > > > >> later, if
> >> > > > > > > > > > > >>>>>>>> needed,
> >> > > > > > > > > > > >>>>>>>>>>>>> with
> >> > > > > > > > > > > >>>>>>>>>>>>>> a regular deprecation and migration
> >> > process.
> >> > > > > Just
> >> > > > > > > > making
> >> > > > > > > > > > > >> my
> >> > > > > > > > > > > >>>>>>>>>>>> suggestions.
> >> > > > > > > > > > > >>>>>>>>>>>>>>
> >> > > > > > > > > > > >>>>>>>>>>>>>>
> >> > > > > > > > > > > >>>>>>>>>>>>>> Best,
> >> > > > > > > > > > > >>>>>>>>>>>>>>
> >> > > > > > > > > > > >>>>>>>>>>>>>> Xintong
> >> > > > > > > > > > > >>>>>>>>>>>>>>
> >> > > > > > > > > > > >>>>>>>>>>>>>>
> >> > > > > > > > > > > >>>>>>>>>>>>>>
> >> > > > > > > > > > > >>>>>>>>>>>>>> On Mon, Aug 14, 2023 at 12:00 PM
> >> Xuannan
> >> > Su
> >> > > <
> >> > > > > > > > > > > >>>>>> suxuanna...@gmail.com
> >> > > > > > > > > > > >>>>>>>>>> <mailto:suxuanna...@gmail.com>>
> >> > > > > > > > > > > >>>>>>>>>>>>> wrote:
> >> > > > > > > > > > > >>>>>>>>>>>>>>
> >> > > > > > > > > > > >>>>>>>>>>>>>>> Hi Xintong,
> >> > > > > > > > > > > >>>>>>>>>>>>>>>
> >> > > > > > > > > > > >>>>>>>>>>>>>>> Thanks for the reply.
> >> > > > > > > > > > > >>>>>>>>>>>>>>>
> >> > > > > > > > > > > >>>>>>>>>>>>>>> I have considered using the
> timestamp
> >> in
> >> > > the
> >> > > > > > > records
> >> > > > > > > > to
> >> > > > > > > > > > > >>>> determine
> >> > > > > > > > > > > >>>>>>>> the
> >> > > > > > > > > > > >>>>>>>>>>>>>>> backlog status, and decided to use
> >> > > watermark
> >> > > > at
> >> > > > > > the
> >> > > > > > > > > end.
> >> > > > > > > > > > > >> By
> >> > > > > > > > > > > >>>>>>>>>>>> definition,
> >> > > > > > > > > > > >>>>>>>>>>>>>>> watermark is the time progress
> >> indication
> >> > > in
> >> > > > > the
> >> > > > > > > data
> >> > > > > > > > > > > >>>> stream. It
> >> > > > > > > > > > > >>>>>>>>>>>>> indicates
> >> > > > > > > > > > > >>>>>>>>>>>>>>> the stream’s event time has
> >> progressed to
> >> > > > some
> >> > > > > > > > specific
> >> > > > > > > > > > > >>>> time. On
> >> > > > > > > > > > > >>>>>>>> the
> >> > > > > > > > > > > >>>>>>>>>>>>> other
> >> > > > > > > > > > > >>>>>>>>>>>>>>> hand, timestamp in the records is
> >> usually
> >> > > > used
> >> > > > > to
> >> > > > > > > > > > > >> generate
> >> > > > > > > > > > > >>>> the
> >> > > > > > > > > > > >>>>>>>>>>>>> watermark.
> >> > > > > > > > > > > >>>>>>>>>>>>>>> Therefore, it appears more
> appropriate
> >> > and
> >> > > > > > > intuitive
> >> > > > > > > > to
> >> > > > > > > > > > > >>>> calculate
> >> > > > > > > > > > > >>>>>>>> the
> >> > > > > > > > > > > >>>>>>>>>>>>> event
> >> > > > > > > > > > > >>>>>>>>>>>>>>> time lag by watermark and determine
> >> the
> >> > > > backlog
> >> > > > > > > > status.
> >> > > > > > > > > > > >> And
> >> > > > > > > > > > > >>>> by
> >> > > > > > > > > > > >>>>>>>> using
> >> > > > > > > > > > > >>>>>>>>>>>>> the
> >> > > > > > > > > > > >>>>>>>>>>>>>>> watermark, we can easily deal with
> the
> >> > > > > > out-of-order
> >> > > > > > > > and
> >> > > > > > > > > > > >> the
> >> > > > > > > > > > > >>>>>>>> idleness
> >> > > > > > > > > > > >>>>>>>>>>>>> of the
> >> > > > > > > > > > > >>>>>>>>>>>>>>> data.
> >> > > > > > > > > > > >>>>>>>>>>>>>>>
> >> > > > > > > > > > > >>>>>>>>>>>>>>> Please let me know if you have
> further
> >> > > > > questions.
> >> > > > > > > > > > > >>>>>>>>>>>>>>>
> >> > > > > > > > > > > >>>>>>>>>>>>>>> Best,
> >> > > > > > > > > > > >>>>>>>>>>>>>>> Xuannan
> >> > > > > > > > > > > >>>>>>>>>>>>>>> On Aug 10, 2023, 20:23 +0800,
> Xintong
> >> > Song
> >> > > <
> >> > > > > > > > > > > >>>>>> tonysong...@gmail.com
> >> > > > > > > > > > > >>>>>>>>>> <mailto:tonysong...@gmail.com>>,
> >> > > > > > > > > > > >>>>>>>>>>>>> wrote:
> >> > > > > > > > > > > >>>>>>>>>>>>>>>> Thanks for preparing the FLIP,
> >> Xuannan.
> >> > > > > > > > > > > >>>>>>>>>>>>>>>>
> >> > > > > > > > > > > >>>>>>>>>>>>>>>> +1 in general.
> >> > > > > > > > > > > >>>>>>>>>>>>>>>>
> >> > > > > > > > > > > >>>>>>>>>>>>>>>> A quick question, could you explain
> >> why
> >> > we
> >> > > > are
> >> > > > > > > > relying
> >> > > > > > > > > > > >> on
> >> > > > > > > > > > > >>>> the
> >> > > > > > > > > > > >>>>>>>>>>>>> watermark
> >> > > > > > > > > > > >>>>>>>>>>>>>>> for
> >> > > > > > > > > > > >>>>>>>>>>>>>>>> emitting the record attribute? Why
> >> not
> >> > use
> >> > > > > > > > timestamps
> >> > > > > > > > > > > >> in the
> >> > > > > > > > > > > >>>>>>>>>>>>> records? I
> >> > > > > > > > > > > >>>>>>>>>>>>>>>> don't see any concern in using
> >> > watermarks.
> >> > > > > Just
> >> > > > > > > > > > > >> wondering if
> >> > > > > > > > > > > >>>>>>>>>>>> there's
> >> > > > > > > > > > > >>>>>>>>>>>>> any
> >> > > > > > > > > > > >>>>>>>>>>>>>>>> deep considerations behind this.
> >> > > > > > > > > > > >>>>>>>>>>>>>>>>
> >> > > > > > > > > > > >>>>>>>>>>>>>>>> Best,
> >> > > > > > > > > > > >>>>>>>>>>>>>>>>
> >> > > > > > > > > > > >>>>>>>>>>>>>>>> Xintong
> >> > > > > > > > > > > >>>>>>>>>>>>>>>>
> >> > > > > > > > > > > >>>>>>>>>>>>>>>>
> >> > > > > > > > > > > >>>>>>>>>>>>>>>>
> >> > > > > > > > > > > >>>>>>>>>>>>>>>> On Thu, Aug 3, 2023 at 3:03 PM
> >> Xuannan
> >> > Su
> >> > > <
> >> > > > > > > > > > > >>>>>> suxuanna...@gmail.com
> >> > > > > > > > > > > >>>>>>>>>> <mailto:suxuanna...@gmail.com>>
> >> > > > > > > > > > > >>>>>>>>>>>>> wrote:
> >> > > > > > > > > > > >>>>>>>>>>>>>>>>
> >> > > > > > > > > > > >>>>>>>>>>>>>>>>> Hi all,
> >> > > > > > > > > > > >>>>>>>>>>>>>>>>>
> >> > > > > > > > > > > >>>>>>>>>>>>>>>>> I am opening this thread to
> discuss
> >> > > > FLIP-328:
> >> > > > > > > Allow
> >> > > > > > > > > > > >> source
> >> > > > > > > > > > > >>>>>>>>>>>>> operators to
> >> > > > > > > > > > > >>>>>>>>>>>>>>>>> determine isProcessingBacklog
> based
> >> on
> >> > > > > > watermark
> >> > > > > > > > > > > >> lag[1]. We
> >> > > > > > > > > > > >>>>>> had a
> >> > > > > > > > > > > >>>>>>>>>>>>>>> several
> >> > > > > > > > > > > >>>>>>>>>>>>>>>>> discussions with Dong Ling about
> the
> >> > > > design,
> >> > > > > > and
> >> > > > > > > > > thanks
> >> > > > > > > > > > > >>>> for all
> >> > > > > > > > > > > >>>>>>>>>>>> the
> >> > > > > > > > > > > >>>>>>>>>>>>>>>>> valuable advice.
> >> > > > > > > > > > > >>>>>>>>>>>>>>>>>
> >> > > > > > > > > > > >>>>>>>>>>>>>>>>> The FLIP aims to target the
> use-case
> >> > > where
> >> > > > > user
> >> > > > > > > > want
> >> > > > > > > > > to
> >> > > > > > > > > > > >>>> run a
> >> > > > > > > > > > > >>>>>>>>>>>> Flink
> >> > > > > > > > > > > >>>>>>>>>>>>>>> job to
> >> > > > > > > > > > > >>>>>>>>>>>>>>>>> backfill historical data in a high
> >> > > > throughput
> >> > > > > > > > manner
> >> > > > > > > > > > > >> and
> >> > > > > > > > > > > >>>>>> continue
> >> > > > > > > > > > > >>>>>>>>>>>>>>>>> processing real-time data with low
> >> > > latency.
> >> > > > > > > > Building
> >> > > > > > > > > > > >> upon
> >> > > > > > > > > > > >>>> the
> >> > > > > > > > > > > >>>>>>>>>>>>> backlog
> >> > > > > > > > > > > >>>>>>>>>>>>>>>>> concept introduced in FLIP-309[2],
> >> this
> >> > > > > > proposal
> >> > > > > > > > > > > >> enables
> >> > > > > > > > > > > >>>>>> sources
> >> > > > > > > > > > > >>>>>>>>>>>> to
> >> > > > > > > > > > > >>>>>>>>>>>>>>> report
> >> > > > > > > > > > > >>>>>>>>>>>>>>>>> their status of processing backlog
> >> > based
> >> > > on
> >> > > > > the
> >> > > > > > > > > > > >> watermark
> >> > > > > > > > > > > >>>> lag.
> >> > > > > > > > > > > >>>>>>>>>>>>>>>>>
> >> > > > > > > > > > > >>>>>>>>>>>>>>>>> We would greatly appreciate any
> >> > comments
> >> > > or
> >> > > > > > > > feedback
> >> > > > > > > > > > > >> you
> >> > > > > > > > > > > >>>> may
> >> > > > > > > > > > > >>>>>> have
> >> > > > > > > > > > > >>>>>>>>>>>>> on
> >> > > > > > > > > > > >>>>>>>>>>>>>>> this
> >> > > > > > > > > > > >>>>>>>>>>>>>>>>> proposal.
> >> > > > > > > > > > > >>>>>>>>>>>>>>>>>
> >> > > > > > > > > > > >>>>>>>>>>>>>>>>> Best,
> >> > > > > > > > > > > >>>>>>>>>>>>>>>>> Xuannan
> >> > > > > > > > > > > >>>>>>>>>>>>>>>>>
> >> > > > > > > > > > > >>>>>>>>>>>>>>>>>
> >> > > > > > > > > > > >>>>>>>>>>>>>>>>> [1]
> >> > > > > > > > > > > >>>>>>>>>>>>>>>>>
> >> > > > > > > > > > > >>>>>>>>>>>>>>>
> >> > > > > > > > > > > >>>>>>>>>>>>>
> >> > > > > > > > > > > >>>>>>>>>>>>
> >> > > > > > > > > > > >>>>>>>>>>
> >> > > > > > > > > > > >>>>>>>>
> >> > > > > > > > > > > >>>>>>
> >> > > > > > > > > > > >>>>
> >> > > > > > > > > > > >>
> >> > > > > > > > > >
> >> > > > > > > > >
> >> > > > > > > >
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-328%3A+Allow+source+operators+to+determine+isProcessingBacklog+based+on+watermark+lag
> >> > > > > > > > > > > >>>>>>>>>> <
> >> > > > > > > > > > > >>>>>>>>>>
> >> > > > > > > > > > > >>>>>>>>
> >> > > > > > > > > > > >>>>>>
> >> > > > > > > > > > > >>>>
> >> > > > > > > > > > > >>
> >> > > > > > > > > >
> >> > > > > > > > >
> >> > > > > > > >
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-328%3A+Allow+source+operators+to+determine+isProcessingBacklog+based+on+watermark+lag
> >> > > > > > > > > > > >>>>>>>>>>>
> >> > > > > > > > > > > >>>>>>>>>>>>>>>>> [2]
> >> > > > > > > > > > > >>>>>>>>>>>>>>>>>
> >> > > > > > > > > > > >>>>>>>>>>>>>>>
> >> > > > > > > > > > > >>>>>>>>>>>>>
> >> > > > > > > > > > > >>>>>>>>>>>>
> >> > > > > > > > > > > >>>>>>>>>>
> >> > > > > > > > > > > >>>>>>>>
> >> > > > > > > > > > > >>>>>>
> >> > > > > > > > > > > >>>>
> >> > > > > > > > > > > >>
> >> > > > > > > > > >
> >> > > > > > > > >
> >> > > > > > > >
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-309%3A+Support+using+larger+checkpointing+interval+when+source+is+processing+backlog
> >> > > > > > > > > > > >>>>>>>>>> <
> >> > > > > > > > > > > >>>>>>>>>>
> >> > > > > > > > > > > >>>>>>>>
> >> > > > > > > > > > > >>>>>>
> >> > > > > > > > > > > >>>>
> >> > > > > > > > > > > >>
> >> > > > > > > > > >
> >> > > > > > > > >
> >> > > > > > > >
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-309%3A+Support+using+larger+checkpointing+interval+when+source+is+processing+backlog
> >> > > > > > > > > > > >>>>>>>>
> >> > > > > > > > > > > >>>>>>>>
> >> > > > > > > > > > > >>>>>>>>
> >> > > > > > > > > > > >>>>>>
> >> > > > > > > > > > > >>>>>>
> >> > > > > > > > > > > >>>>
> >> > > > > > > > > > > >>
> >> > > > > > > > > > >
> >> > > > > > > > > >
> >> > > > > > > > >
> >> > > > > > > >
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> >
>

Reply via email to