Hi Piotr,

Thanks for the comments. Let me try to explain it below.

Overall, the two competing options differ in how an invocation of
`#setIsProcessingBacklog(false)` affects the backlog status for the given
source (corresponding to the SplitEnumeratorContext instance on which this
method is invoked).

- With my approach, setIsProcessingBacklog(false) merely unsets effects of
any previous invocation of setIsProcessingBacklog(..) on the given source,
without necessarily forcing the source's backlog status to be false.
- With Jark’s approach, setIsProcessingBacklog(false) forces the source's
backlog status to be false.

There is no practical difference between these two options as of FLIP-309.
However, once we introduce additional strategy (e.g. job-level config) to
configure backlog status in FLIP-328, there will be tricky and important
differences between them.

More specifically, let’s say we want to introduce a job-level config such
as “”pipeline.backlog.watermark-lag-threshold” as mentioned in FLIP-328:

- With Jack’s approach, if MySQL CDC invokes setIsProcessingBacklog(false)
at the beginning of the “unbounded phase”, then that effectively means
isProcessingBacklog=false even if watermark lag exceeds the configured
threshold, preventing job-level config from taking effect during the
"unbounded phase".
- With my approach, even if MySQL CDC invokes setIsProcessingBacklog(false)
at the beginning of the “unbounded phase”, the source can still have
isProcessingBacklog=true when watermark lag is too high.

Would this clarify the difference between these two options?

Regards,
Dong


On Mon, Sep 25, 2023 at 5:15 PM Piotr Nowojski <piotr.nowoj...@gmail.com>
wrote:

> Hi Jarl and Dong,
>
> I'm a bit confused about the difference between the two competing options.
> Could one of you elaborate what's the difference between:
> > 2) The semantics of `#setIsProcessingBacklog(false)` is that it overrides
> > the effect of the previous invocation (if any) of
> > `#setIsProcessingBacklog(true)` on the given source instance.
>
> and
>
> > 2) The semantics of `#setIsProcessingBacklog(false)` is that the given
> > source instance will have watermarkLag=false.
>
> ?
>
> Best,
> Piotrek
>
> czw., 21 wrz 2023 o 15:28 Dong Lin <lindon...@gmail.com> napisał(a):
>
> > Hi all,
> >
> > Jark and I discussed this FLIP offline and I will summarize our
> discussion
> > below. It would be great if you could provide your opinion of the
> proposed
> > options.
> >
> > Regarding the target use-cases:
> > - We both agreed that MySQL CDC should have backlog=true when
> watermarkLag
> > is large during the binlog phase.
> > - Dong argued that other streaming sources with watermarkLag defined
> (e.g.
> > Kafka) should also have backlog=true when watermarkLag is large. The
> > pros/cons discussion below assume this use-case needs to be supported.
> >
> > The 1st option is what is currently proposed in FLIP-328, with the
> > following key characteristics:
> > 1) There is one job-level config (i.e.
> > pipeline.backlog.watermark-lag-threshold) that applies to all sources
> with
> > watermarkLag metric defined.
> > 2) The semantics of `#setIsProcessingBacklog(false)` is that it overrides
> > the effect of the previous invocation (if any) of
> > `#setIsProcessingBacklog(true)` on the given source instance.
> >
> > The 2nd option is what Jark proposed in this email thread, with the
> > following key characteristics:
> > 1) Add source-specific config (both Java API and SQL source property) to
> > every source for which we want to set backlog status based on the
> > watermarkLag metric. For example, we might add separate Java APIs
> > `#setWatermarkLagThreshold`  for MySQL CDC source, HybridSource,
> > KafkaSource, PulsarSource etc.
> > 2) The semantics of `#setIsProcessingBacklog(false)` is that the given
> > source instance will have watermarkLag=false.
> >
> > Here are the key pros/cons of these two options.
> >
> > Cons of the 1st option:
> > 1) The semantics of `#setIsProcessingBacklog(false)` is harder to
> > understand for Flink operator developers than the corresponding semantics
> > in option-2.
> >
> > Cons of the  2nd option:
> > 1) More work for end-users. For a job with multiple sources that need to
> be
> > configured with a watermark lag threshold, users need to specify multiple
> > configs (one for each source) instead of specifying one job-level config.
> >
> > 2) More work for Flink operator developers. Overall there are more public
> > APIs (one Java API and one SQL property for each source that needs to
> > determine backlog based on watermark) exposed to end users. This also
> adds
> > more burden for the Flink community to maintain these APIs.
> >
> > 3) It would be hard (e.g. require backward incompatible API change) to
> > extend the Flink runtime to support job-level config to set watermark
> > strategy in the future (e.g. support the
> > pipeline.backlog.watermark-lag-threshold in option-1). This is because an
> > existing source operator's code might have hardcoded an invocation of
> > `#setIsProcessingBacklog(false)`, which means the backlog status must be
> > set to true, which prevents Flink runtime from setting backlog=true when
> a
> > new strategy is triggered.
> >
> > Overall, I am still inclined to choose option-1 because it is more
> > extensible and simpler to use in the long term when we want to
> support/use
> > multiple sources whose backlog status can change based on the watermark
> > lag. While option-1's `#setIsProcessingBacklog` is a bit harder to
> > understand than option-2, I think this overhead/cost is worthwhile as it
> > makes end-users' life easier in the long term.
> >
> > Jark: thank you for taking the time to review this FLIP. Please feel free
> > to comment if I missed anything in the pros/cons above.
> >
> > Jark and I have not reached agreement on which option is better. It will
> be
> > really helpful if we can get more comments on these options.
> >
> > Thanks,
> > Dong
> >
> >
> > On Tue, Sep 19, 2023 at 11:26 AM Dong Lin <lindon...@gmail.com> wrote:
> >
> > > Hi Jark,
> > >
> > > Thanks for the reply. Please see my comments inline.
> > >
> > > On Tue, Sep 19, 2023 at 10:12 AM Jark Wu <imj...@gmail.com> wrote:
> > >
> > >> Hi Dong,
> > >>
> > >> Sorry for the late reply.
> > >>
> > >> > The rationale is that if there is any strategy that is triggered and
> > >> says
> > >> > backlog=true, then job's backlog should be true. Otherwise, the
> job's
> > >> > backlog status is false.
> > >>
> > >> I'm quite confused about this. Does that mean, if the source is in the
> > >> changelog phase, the source has to continuously invoke
> > >> "setIsProcessingBacklog(true)" (in an infinite loop?). Otherwise,
> > >> the job's backlog status would be set to false by the framework?
> > >>
> > >
> > > No, the source would not have to continuously invoke
> > > setIsProcessingBacklog(true) in an infinite loop.
> > >
> > > Actually, I am not very sure why there is confusion that "the job's
> > > backlog status would be set to false by the framework". Could you
> explain
> > > where that comes from?
> > >
> > > I guess it might be useful to provide a complete overview of how
> > > setIsProcessingBacklog(...)
> > > and pipline.backlog.watermark-lag-threshold work together to determine
> > the
> > > overall job's backlog status. Let me explain it below.
> > >
> > > Here is the semantics/behavior of setIsProcessingBacklog(..).
> > > - This method is invoked on a per-source basis.
> > > - This method can be invoked multiple times with true/false as its
> input
> > > parameter.
> > > - For a given source, the last invocation of this method overwrites the
> > > effect of earlier invocation of this method.
> > > - For a given source, if this method has been invoked at least once and
> > > the last invocation of this method has isProcessingBacklog = true, then
> > it
> > > is guaranteed that the backlog status of this source is set to true.
> > >
> > > Here is the semantics/behavior of
> > pipline.backlog.watermark-lag-threshold:
> > > - This config is specified at the job level and applies to every source
> > > included in this job.
> > > - For a given source, if it's watermarkLag metric is available (see
> > > FLIP-33) and watermarkLag > watermark-lag-threshold, then it is
> > guaranteed
> > > that backlog status of this source is set to true.
> > >
> > > Here is how the source's backlog status is determined: If a rule
> > specified
> > > above says the source's backog status should be true, then the source's
> > > backlog status is set to true. Otherwise, it is set to false.
> > >
> > > How is how the job's backlog status is determined: If there exists a
> > > source that is currently running and the source's backlog status is set
> > to
> > > true, then the job's backlog status is set to true. Otherwise, it is
> set
> > to
> > > false.
> > >
> > > Hopefully this can help clarify the behavior. If needed, we can update
> > the
> > > relevant doc (e.g. setIsProcessingBacklog() Java doc) to make semantics
> > > above clearer for Flink users.
> > >
> > > And I would be happy to discuss/explain this design offline when you
> have
> > > time.
> > >
> > > Thanks,
> > > Dong
> > >
> > >
> > >>
> > >> Best,
> > >> Jark
> > >>
> > >> On Tue, 19 Sept 2023 at 09:13, Dong Lin <lindon...@gmail.com> wrote:
> > >>
> > >> > Hi Jark,
> > >> >
> > >> > Do you have time to comment on whether the current design looks
> good?
> > >> >
> > >> > I plan to start voting in 3 days if there is no follow-up comment.
> > >> >
> > >> > Thanks,
> > >> > Dong
> > >> >
> > >> >
> > >> > On Fri, Sep 15, 2023 at 2:01 PM Jark Wu <imj...@gmail.com> wrote:
> > >> >
> > >> > > Hi Dong,
> > >> > >
> > >> > > > Note that we can not simply enforce the semantics of "any
> > >> invocation of
> > >> > > > setIsProcessingBacklog(false) will set the job's backlog status
> to
> > >> > > false".
> > >> > > > Suppose we have a job with two operators, where operatorA
> invokes
> > >> > > > setIsProcessingBacklog(false) and operatorB invokes
> > >> > > > setIsProcessingBacklog(true). There will be conflict if we use
> the
> > >> > > > semantics of "any invocation of setIsProcessingBacklog(false)
> will
> > >> set
> > >> > > the
> > >> > > > job's backlog status to false".
> > >> > >
> > >> > > So it should set job's backlog status to false if the job has
> only a
> > >> > single
> > >> > > source,
> > >> > > right?
> > >> > >
> > >> > > Could you elaborate on the behavior if there is a job with a
> single
> > >> > source,
> > >> > > and the watermark lag exceeds the configured value (should set
> > >> backlog to
> > >> > > true?),
> > >> > > but the source invokes "setIsProcessingBacklog(false)"? Or the
> > inverse
> > >> > one,
> > >> > > the source invokes "setIsProcessingBacklog(false)" first, but the
> > >> > watermark
> > >> > > lag
> > >> > > exceeds the configured value.
> > >> > >
> > >> > > This is the conflict I'm concerned about.
> > >> > >
> > >> > > Best,
> > >> > > Jark
> > >> > >
> > >> > > On Fri, 15 Sept 2023 at 12:00, Dong Lin <lindon...@gmail.com>
> > wrote:
> > >> > >
> > >> > > > Hi Jark,
> > >> > > >
> > >> > > > Please see my comments inline.
> > >> > > >
> > >> > > > On Fri, Sep 15, 2023 at 10:35 AM Jark Wu <imj...@gmail.com>
> > wrote:
> > >> > > >
> > >> > > > > Hi Dong,
> > >> > > > >
> > >> > > > > Please see my comments inline below.
> > >> > > >
> > >> > > >
> > >> > > > > > Hmm.. can you explain what you mean by "different watermark
> > >> delay
> > >> > > > > > definitions for each source"?
> > >> > > > >
> > >> > > > > For example, "table1" defines a watermark with delay 5
> seconds,
> > >> > > > > "table2" defines a watermark with delay 10 seconds. They have
> > >> > different
> > >> > > > > watermark delay definitions. So it is also reasonable they
> have
> > >> > > different
> > >> > > > > watermark lag definitions, e.g., "table1" allows "10mins" and
> > >> > "table2"
> > >> > > > > allows "20mins".
> > >> > > > >
> > >> > > >
> > >> > > > I think the watermark delay you mentioned above is conceptually
> /
> > >> > > > fundamentally different from the watermark-lag-threshold
> proposed
> > in
> > >> > this
> > >> > > > FLIP.
> > >> > > >
> > >> > > > It might be useful to revisit the semantics of these two
> concepts:
> > >> > > > - watermark delay is used to account for the maximum amount of
> > >> > > orderliness
> > >> > > > that users expect (or willing to wait for) for records from a
> > given
> > >> > > source.
> > >> > > > - watermark-lag-threshold is used to define when processing
> > latency
> > >> is
> > >> > no
> > >> > > > longer important (e.g. because data is already stale).
> > >> > > >
> > >> > > > Even though users might expect different out of orderliness for
> > >> > different
> > >> > > > sources, users do not necessarily have different definitions /
> > >> > thresholds
> > >> > > > for when a record is considered "already stale".
> > >> > > >
> > >> > > >
> > >> > > > >
> > >> > > > > > I think there is probably misunderstanding here. FLIP-309
> does
> > >> NOT
> > >> > > > > directly
> > >> > > > > > specify when backlog is false. It is intentionally specified
> > in
> > >> > such
> > >> > > a
> > >> > > > > way
> > >> > > > > > that there will  not be any conflict between these rules.
> > >> > > > >
> > >> > > > > Do you mean FLIP-309 doesn't allow to specify backlog to be
> > false?
> > >> > > > > Is this mentioned in FLIP-309? This is completely different
> from
> > >> > what I
> > >> > > > >
> > >> > > >
> > >> > > > Can you explain what you mean by "allow to specify backlog to be
> > >> > false"?
> > >> > > >
> > >> > > > If what you mean is that "can invoke
> > setIsProcessingBacklog(false)",
> > >> > then
> > >> > > > FLIP-309 supports doing this.
> > >> > > >
> > >> > > > If what you mean is that "any invocation of
> > >> > setIsProcessingBacklog(false)
> > >> > > > will set the job's backlog status to false", then FLIP-309 does
> > not
> > >> > > support
> > >> > > > this. I believe the existing Java doc of this API and FLIP-309
> is
> > >> > > > compatible with this explanation.
> > >> > > >
> > >> > > > Note that we can not simply enforce the semantics of "any
> > >> invocation of
> > >> > > > setIsProcessingBacklog(false) will set the job's backlog status
> to
> > >> > > false".
> > >> > > > Suppose we have a job with two operators, where operatorA
> invokes
> > >> > > > setIsProcessingBacklog(false) and operatorB invokes
> > >> > > > setIsProcessingBacklog(true). There will be conflict if we use
> the
> > >> > > > semantics of "any invocation of setIsProcessingBacklog(false)
> will
> > >> set
> > >> > > the
> > >> > > > job's backlog status to false".
> > >> > > >
> > >> > > > Would this answer your question?
> > >> > > >
> > >> > > > Best,
> > >> > > > Dong
> > >> > > >
> > >> > > >
> > >> > > > > understand. From the API interface
> > >> > > "ctx.setIsProcessingBacklog(boolean)",
> > >> > > > > it allows users to invoke "setIsProcessingBacklog(false)". And
> > >> > FLIP-309
> > >> > > > > also says "MySQL CDC source should report
> > >> isProcessingBacklog=false
> > >> > > > > at the beginning of the changelog stage." If not, maybe we
> need
> > to
> > >> > > > revisit
> > >> > > > > FLIP-309.
> > >> > > >
> > >> > > >
> > >> > > > > Best,
> > >> > > > > Jark
> > >> > > > >
> > >> > > > >
> > >> > > > >
> > >> > > > > On Fri, 15 Sept 2023 at 08:41, Dong Lin <lindon...@gmail.com>
> > >> wrote:
> > >> > > > >
> > >> > > > > > Hi Jark,
> > >> > > > > >
> > >> > > > > > Do you have any follow-up comment?
> > >> > > > > >
> > >> > > > > > My gut feeling is that suppose we need to support per-source
> > >> > > watermark
> > >> > > > > lag
> > >> > > > > > specification in the future (not sure we have a use-case for
> > >> this
> > >> > > right
> > >> > > > > > now), we can add such a config in the future with a
> follow-up
> > >> FLIP.
> > >> > > The
> > >> > > > > > job-level config will still be useful as it makes users'
> > >> > > configuration
> > >> > > > > > simpler for common scenarios.
> > >> > > > > >
> > >> > > > > > If it is OK, can we agree to make incremental progress for
> > Flink
> > >> > and
> > >> > > > > start
> > >> > > > > > a voting thread for this FLIP?
> > >> > > > > >
> > >> > > > > > Thanks,
> > >> > > > > > Dong
> > >> > > > > >
> > >> > > > > >
> > >> > > > > > On Mon, Sep 11, 2023 at 4:41 PM Jark Wu <imj...@gmail.com>
> > >> wrote:
> > >> > > > > >
> > >> > > > > > > Hi Dong,
> > >> > > > > > >
> > >> > > > > > > Please see my comments inline.
> > >> > > > > > >
> > >> > > > > > > >  As a result, the proposed job-level
> > >> > > > > > > > config will be applied only in the changelog stage. So
> > >> there is
> > >> > > no
> > >> > > > > > > > difference between these two approaches in this
> particular
> > >> > case,
> > >> > > > > right?
> > >> > > > > > >
> > >> > > > > > > How the job-level config can be applied ONLY in the
> > changelog
> > >> > > stage?
> > >> > > > > > > I think it is only possible if it is implemented by the
> CDC
> > >> > source
> > >> > > > > > itself,
> > >> > > > > > > because the framework doesn't know which stage of the
> source
> > >> is.
> > >> > > > > > > Know that the CDC source may emit watermarks with a very
> > small
> > >> > lag
> > >> > > > > > > in the snapshot stage, and the job-level config may turn
> the
> > >> > > backlog
> > >> > > > > > > status into false.
> > >> > > > > > >
> > >> > > > > > > > On the other hand, per-source config will be necessary
> if
> > >> users
> > >> > > > want
> > >> > > > > to
> > >> > > > > > > > apply different watermark lag thresholds for different
> > >> sources
> > >> > in
> > >> > > > the
> > >> > > > > > > same
> > >> > > > > > > > job.
> > >> > > > > > >
> > >> > > > > > > We also have different watermark delay definitions for
> each
> > >> > source,
> > >> > > > > > > I think this's also reasonable and necessary to have
> > different
> > >> > > > > watermark
> > >> > > > > > > lags.
> > >> > > > > > >
> > >> > > > > > >
> > >> > > > > > > > Each source can have its own rule that specifies when
> the
> > >> > backlog
> > >> > > > can
> > >> > > > > > be
> > >> > > > > > > true
> > >> > > > > > > > (e.g. MySql CDC says the backlog should be true during
> the
> > >> > > snapshot
> > >> > > > > > > stage).
> > >> > > > > > > > And we can have a job-level config that specifies when
> the
> > >> > > backlog
> > >> > > > > > should
> > >> > > > > > > > be true. Note that it is designed in such a way that
> none
> > of
> > >> > > these
> > >> > > > > > rules
> > >> > > > > > > > specify when the backlog should be false. That is why
> > there
> > >> is
> > >> > no
> > >> > > > > > > conflict
> > >> > > > > > > > by definition.
> > >> > > > > > >
> > >> > > > > > > IIUC, FLIP-309 provides `setIsProcessingBacklog` to
> specify
> > >> when
> > >> > > the
> > >> > > > > > > backlog
> > >> > > > > > > is true and when is FALSE. This conflicts with the
> job-level
> > >> > config
> > >> > > > as
> > >> > > > > it
> > >> > > > > > > will turn
> > >> > > > > > > the status into true.
> > >> > > > > > >
> > >> > > > > > > > If I understand your comments correctly, you mean that
> we
> > >> might
> > >> > > > have
> > >> > > > > a
> > >> > > > > > > > Flink SQL DDL with user-defined watermark expressions.
> And
> > >> > users
> > >> > > > also
> > >> > > > > > > want
> > >> > > > > > > > to set the backlog to true if the watermark generated by
> > >> that
> > >> > > > > > > > user-specified expression exceeds a threshold.
> > >> > > > > > >
> > >> > > > > > > No. I mean the source may not support generating
> watermarks,
> > >> so
> > >> > the
> > >> > > > > > > watermark
> > >> > > > > > > expression is applied in a following operator (instead of
> in
> > >> the
> > >> > > > source
> > >> > > > > > > operator).
> > >> > > > > > > This will result in the watermark lag doesn't work in this
> > >> case
> > >> > and
> > >> > > > > > confuse
> > >> > > > > > > users.
> > >> > > > > > >
> > >> > > > > > > > You are right that this is a limitation. However, this
> is
> > >> only
> > >> > a
> > >> > > > > > > short-term
> > >> > > > > > > > limitation which we added to make sure that we can focus
> > on
> > >> the
> > >> > > > > > > capability
> > >> > > > > > > > to switch from backlog=true to backlog=false. In the
> > >> future, we
> > >> > > > will
> > >> > > > > > > remove
> > >> > > > > > > > this limitation and also support switching from
> > >> backlog=false
> > >> > to
> > >> > > > > > > > backlog=true.
> > >> > > > > > >
> > >> > > > > > > I can understand it may be difficult to support runtime
> mode
> > >> > > > switching
> > >> > > > > > back
> > >> > > > > > > and forth.
> > >> > > > > > > However, I think this should be a limitation of FLIP-327,
> > not
> > >> > > > FLIP-328.
> > >> > > > > > > IIUC,
> > >> > > > > > > FLIP-309 doesn't have this limitation, right? I just don't
> > >> > > understand
> > >> > > > > > > what's the
> > >> > > > > > > challenge to switch a flag?
> > >> > > > > > >
> > >> > > > > > > Best,
> > >> > > > > > > Jark
> > >> > > > > > >
> > >> > > > > > >
> > >> > > > > > > On Sun, 10 Sept 2023 at 19:44, Dong Lin <
> > lindon...@gmail.com>
> > >> > > wrote:
> > >> > > > > > >
> > >> > > > > > > > Hi Jark,
> > >> > > > > > > >
> > >> > > > > > > > Thanks for the comments. Please see my comments inline.
> > >> > > > > > > >
> > >> > > > > > > > On Sat, Sep 9, 2023 at 4:13 PM Jark Wu <
> imj...@gmail.com>
> > >> > wrote:
> > >> > > > > > > >
> > >> > > > > > > > > Hi Xuannan,
> > >> > > > > > > > >
> > >> > > > > > > > > I leave my comments inline.
> > >> > > > > > > > >
> > >> > > > > > > > > > In the case where a user wants to
> > >> > > > > > > > > > use a CDC source and also determine backlog status
> > >> based on
> > >> > > > > > watermark
> > >> > > > > > > > > > lag, we still need to define the rule when that
> occurs
> > >> > > > > > > > >
> > >> > > > > > > > > This rule should be defined by the source itself (who
> > >> knows
> > >> > > > backlog
> > >> > > > > > > > best),
> > >> > > > > > > > > not by the framework. In the case of CDC source, it
> > >> reports
> > >> > > > > > > > isBacklog=true
> > >> > > > > > > > > during snapshot stage, and report isBacklog=false
> during
> > >> > > > changelog
> > >> > > > > > > stage
> > >> > > > > > > > if
> > >> > > > > > > > > watermark-lag is within the threshold.
> > >> > > > > > > > >
> > >> > > > > > > >
> > >> > > > > > > > I am not sure I fully understand the difference between
> > >> adding
> > >> > a
> > >> > > > > > > job-level
> > >> > > > > > > > config vs. adding a per-source config.
> > >> > > > > > > >
> > >> > > > > > > > In the case of CDC, its watermark lag should be either
> > >> > > unde-defined
> > >> > > > > or
> > >> > > > > > > > really large in the snapshot stage. As a result, the
> > >> proposed
> > >> > > > > job-level
> > >> > > > > > > > config will be applied only in the changelog stage. So
> > >> there is
> > >> > > no
> > >> > > > > > > > difference between these two approaches in this
> particular
> > >> > case,
> > >> > > > > right?
> > >> > > > > > > >
> > >> > > > > > > > There are two advantages of the job-level config over
> > >> > per-source
> > >> > > > > > config:
> > >> > > > > > > >
> > >> > > > > > > > 1) Configuration is simpler. For example, suppose a user
> > >> has a
> > >> > > > Flink
> > >> > > > > > job
> > >> > > > > > > > that consumes records from multiple Kafka sources and
> > wants
> > >> to
> > >> > > > > > determine
> > >> > > > > > > > backlog status for these Kafka sources using the same
> > >> watermark
> > >> > > lag
> > >> > > > > > > > threshold, there is no need for users to repeatedly
> > specify
> > >> > this
> > >> > > > > > > threshold
> > >> > > > > > > > for each source.
> > >> > > > > > > >
> > >> > > > > > > > 2) There is a smaller number of public APIs overall. In
> > >> > > particular,
> > >> > > > > > > instead
> > >> > > > > > > > of repeatedly adding a
> > >> > > setProcessingBacklogWatermarkLagThreshold()
> > >> > > > > API
> > >> > > > > > > for
> > >> > > > > > > > every source operator that has even-time watermark lag
> > >> defined,
> > >> > > we
> > >> > > > > only
> > >> > > > > > > > need to add one job-level config. Less public API means
> > >> better
> > >> > > > > > simplicity
> > >> > > > > > > > and maintainability in general.
> > >> > > > > > > >
> > >> > > > > > > > On the other hand, per-source config will be necessary
> if
> > >> users
> > >> > > > want
> > >> > > > > to
> > >> > > > > > > > apply different watermark lag thresholds for different
> > >> sources
> > >> > in
> > >> > > > the
> > >> > > > > > > same
> > >> > > > > > > > job. Personally, I find this a bit counter-intuitive for
> > >> users
> > >> > to
> > >> > > > > > specify
> > >> > > > > > > > different watermark lag thresholds in the same job.
> > >> > > > > > > >
> > >> > > > > > > > Do you think there is any real-word use-case that
> requires
> > >> > this?
> > >> > > > > Could
> > >> > > > > > > you
> > >> > > > > > > > provide a specific use-case where per-source config can
> > >> provide
> > >> > > an
> > >> > > > > > > > advantage over the job-level config?
> > >> > > > > > > >
> > >> > > > > > > >
> > >> > > > > > > > > I think it's not intuitive to combine it with the
> > logical
> > >> OR
> > >> > > > > > operation.
> > >> > > > > > > > > Even for the
> > >> > > > > > > > > combination logic of backlog status from different
> > >> channels,
> > >> > > > > FLIP-309
> > >> > > > > > > > said
> > >> > > > > > > > > it is
> > >> > > > > > > > > "up to the operator to determine its output records'
> > >> > isBacklog
> > >> > > > > value"
> > >> > > > > > > and
> > >> > > > > > > > > proposed
> > >> > > > > > > > > 3 different strategies. Therefore, I think backlog
> > status
> > >> > from
> > >> > > a
> > >> > > > > > single
> > >> > > > > > > > > source should
> > >> > > > > > > > > be up to the source.
> > >> > > > > > > >
> > >> > > > > > > >
> > >> > > > > > > > For both the job-level config and the per-source config,
> > it
> > >> is
> > >> > > > > > eventually
> > >> > > > > > > > up to the user to decide the computation logic of the
> > >> backlog
> > >> > > > status.
> > >> > > > > > > > Whether this mechanism is implemented at the per-source
> > >> level
> > >> > or
> > >> > > > > > > framework
> > >> > > > > > > > level is probably more like an implementation detail.
> > >> > > > > > > >
> > >> > > > > > > > Eventually, I think the choice between these two
> > approaches
> > >> > > depends
> > >> > > > > on
> > >> > > > > > > > whether we have any use-case for users to specify
> > different
> > >> > > > watermark
> > >> > > > > > lag
> > >> > > > > > > > thresholds in the same job.
> > >> > > > > > > >
> > >> > > > > > > >
> > >> > > > > > > > >
> > >> > > > > > > > > IMO, a better API design is not how to resolve
> conflicts
> > >> but
> > >> > > not
> > >> > > > > > > > > introducing conflicts.
> > >> > > > > > > >
> > >> > > > > > > >
> > >> > > > > > > > Just to clarify, the current FLIP does not introduce any
> > >> > > conflict.
> > >> > > > > Each
> > >> > > > > > > > source can have its own rule that specifies when the
> > backlog
> > >> > can
> > >> > > be
> > >> > > > > > true
> > >> > > > > > > > (e.g. MySql CDC says the backlog should be true during
> the
> > >> > > snapshot
> > >> > > > > > > stage).
> > >> > > > > > > > And we can have a job-level config that specifies when
> the
> > >> > > backlog
> > >> > > > > > should
> > >> > > > > > > > be true. Note that it is designed in such a way that
> none
> > of
> > >> > > these
> > >> > > > > > rules
> > >> > > > > > > > specify when the backlog should be false. That is why
> > there
> > >> is
> > >> > no
> > >> > > > > > > conflict
> > >> > > > > > > > by definition.
> > >> > > > > > > >
> > >> > > > > > > >
> > >> > > > > > > >
> > >> > > > > > > > Let the source determine backlog status removes the
> > >> conflicts
> > >> > > and I
> > >> > > > > > don't
> > >> > > > > > > > > see big
> > >> > > > > > > > > disadvantages.
> > >> > > > > > > > >
> > >> > > > > > > > > > It should not confuse the user that
> > >> > > > > > > > > > DataStream#assignTimestampsAndWatermarks doesn't
> work
> > >> with
> > >> > > > > > > > > > backlog.watermark-lag-threshold, as it is not a
> > source.
> > >> > > > > > > > >
> > >> > > > > > > > > Hmm, so this configuration may confuse Flink SQL
> users,
> > >> > because
> > >> > > > all
> > >> > > > > > > > > watermarks
> > >> > > > > > > > > are defined on the source DDL, but it may use a
> separate
> > >> > > operator
> > >> > > > > to
> > >> > > > > > > emit
> > >> > > > > > > > > watermarks
> > >> > > > > > > > > if the source doesn't support emitting watermarks.
> > >> > > > > > > > >
> > >> > > > > > > >
> > >> > > > > > > > If I understand your comments correctly, you mean that
> we
> > >> might
> > >> > > > have
> > >> > > > > a
> > >> > > > > > > > Flink SQL DDL with user-defined watermark expressions.
> And
> > >> > users
> > >> > > > also
> > >> > > > > > > want
> > >> > > > > > > > to set the backlog to true if the watermark generated by
> > >> that
> > >> > > > > > > > user-specified expression exceeds a threshold.
> > >> > > > > > > >
> > >> > > > > > > > That is a good point and use-case. I agree we should
> also
> > >> cover
> > >> > > > this
> > >> > > > > > > > scenario. And we can update FLIP-328 to mention that the
> > >> > > job-level
> > >> > > > > > config
> > >> > > > > > > > will also be applicable when the watermark derived from
> > the
> > >> > Flink
> > >> > > > SQL
> > >> > > > > > DDL
> > >> > > > > > > > exceeds this threshold. Would this address your concern?
> > >> > > > > > > >
> > >> > > > > > > >
> > >> > > > > > > > >
> > >> > > > > > > > > > I think the description in the FLIP actually means
> the
> > >> > other
> > >> > > > way
> > >> > > > > > > > > > around, where the job can never switch back to batch
> > >> mode
> > >> > > once
> > >> > > > it
> > >> > > > > > has
> > >> > > > > > > > > > switched into streaming mode. This is to align with
> > the
> > >> > > current
> > >> > > > > > state
> > >> > > > > > > > > > of FLIP-327[1], where only switching from batch to
> > >> stream
> > >> > > mode
> > >> > > > is
> > >> > > > > > > > > > supported.
> > >> > > > > > > > >
> > >> > > > > > > > > This sounds like a limitation of FLIP-327 (that
> > execution
> > >> > mode
> > >> > > > > > depends
> > >> > > > > > > on
> > >> > > > > > > > > backlog status).
> > >> > > > > > > > > But the backlog status shouldn't have this limitation,
> > >> > because
> > >> > > it
> > >> > > > > is
> > >> > > > > > > not
> > >> > > > > > > > > only used for execution
> > >> > > > > > > > > switching.
> > >> > > > > > > > >
> > >> > > > > > > >
> > >> > > > > > > > You are right that this is a limitation. However, this
> is
> > >> only
> > >> > a
> > >> > > > > > > short-term
> > >> > > > > > > > limitation which we added to make sure that we can focus
> > on
> > >> the
> > >> > > > > > > capability
> > >> > > > > > > > to switch from backlog=true to backlog=false. In the
> > >> future, we
> > >> > > > will
> > >> > > > > > > remove
> > >> > > > > > > > this limitation and also support switching from
> > >> backlog=false
> > >> > to
> > >> > > > > > > > backlog=true.
> > >> > > > > > > >
> > >> > > > > > > > The capability to switch from backlog=true to
> > backlog=false
> > >> > will
> > >> > > > > > > mitigate a
> > >> > > > > > > > lot of problems we are facing now. As it is common for
> > >> users to
> > >> > > > > start a
> > >> > > > > > > > Flink job to process backlog data followed by real-time
> > >> data.
> > >> > On
> > >> > > > the
> > >> > > > > > > other
> > >> > > > > > > > hand, switching from backlog=false to backlog=true is
> > useful
> > >> > when
> > >> > > > > there
> > >> > > > > > > is
> > >> > > > > > > > a traffic spike while the Flink job is processing
> > real-time
> > >> > data,
> > >> > > > > which
> > >> > > > > > > is
> > >> > > > > > > > also useful to address but less important than the
> > previous
> > >> > one.
> > >> > > > > > > >
> > >> > > > > > > > Given that both features require considerable changes to
> > the
> > >> > > > > underlying
> > >> > > > > > > > runtime, we think it might be useful and safe to tackle
> > them
> > >> > one
> > >> > > by
> > >> > > > > > one.
> > >> > > > > > > >
> > >> > > > > > > > Thanks again for the comments. Please let us know what
> you
> > >> > think.
> > >> > > > > > > >
> > >> > > > > > > > Best,
> > >> > > > > > > > Dong
> > >> > > > > > > >
> > >> > > > > > > >
> > >> > > > > > > > >
> > >> > > > > > > > > Best,
> > >> > > > > > > > > Jark
> > >> > > > > > > > >
> > >> > > > > > > > >
> > >> > > > > > > > >
> > >> > > > > > > > > On Fri, 8 Sept 2023 at 19:09, Xuannan Su <
> > >> > > suxuanna...@gmail.com>
> > >> > > > > > > wrote:
> > >> > > > > > > > >
> > >> > > > > > > > > > Hi Jark and Leonard,
> > >> > > > > > > > > >
> > >> > > > > > > > > > Thanks for the comments. Please see my reply below.
> > >> > > > > > > > > >
> > >> > > > > > > > > > @Jark
> > >> > > > > > > > > >
> > >> > > > > > > > > > > I think a better API doesn't compete with itself.
> > >> > > Therefore,
> > >> > > > > I'm
> > >> > > > > > in
> > >> > > > > > > > > > favor of
> > >> > > > > > > > > > > supporting the watermark lag threshold for each
> > source
> > >> > > > without
> > >> > > > > > > > > > introducing
> > >> > > > > > > > > > > any framework API and configuration.
> > >> > > > > > > > > >
> > >> > > > > > > > > > I don't think supporting the watermark lag threshold
> > for
> > >> > each
> > >> > > > > > source
> > >> > > > > > > > > > can avoid the competition problem. In the case
> where a
> > >> user
> > >> > > > wants
> > >> > > > > > to
> > >> > > > > > > > > > use a CDC source and also determine backlog status
> > >> based on
> > >> > > > > > watermark
> > >> > > > > > > > > > lag, we still need to define the rule when that
> > occurs.
> > >> > With
> > >> > > > that
> > >> > > > > > > > > > said, I think it is more intuitive to combine it
> with
> > >> the
> > >> > > > logical
> > >> > > > > > OR
> > >> > > > > > > > > > operation, as the strategies (FLIP-309, FLIP-328)
> only
> > >> > > > determine
> > >> > > > > > when
> > >> > > > > > > > > > the source's backlog status should be True. What do
> > you
> > >> > > think?
> > >> > > > > > > > > >
> > >> > > > > > > > > > > Besides, this can address another concern that the
> > >> > > watermark
> > >> > > > > may
> > >> > > > > > be
> > >> > > > > > > > > > > generated by DataStream#assignTimestampsAnd
> > >> > > > > > > > > > > Watermarks which doesn't
> > >> > > > > > > > > > > work with the backlog.watermark-lag-threshold job
> > >> config
> > >> > > > > > > > > >
> > >> > > > > > > > > > The description of the configuration explicitly
> states
> > >> that
> > >> > > "a
> > >> > > > > > source
> > >> > > > > > > > > > would report isProcessingBacklog=true if its
> watermark
> > >> lag
> > >> > > > > exceeds
> > >> > > > > > > the
> > >> > > > > > > > > > configured value". It should not confuse the user
> that
> > >> > > > > > > > > > DataStream#assignTimestampsAndWatermarks doesn't
> work
> > >> with
> > >> > > > > > > > > > backlog.watermark-lag-threshold, as it is not a
> > source.
> > >> > > > > > > > > >
> > >> > > > > > > > > > > Does that mean the job can never back to streaming
> > >> mode
> > >> > > once
> > >> > > > > > > switches
> > >> > > > > > > > > > into
> > >> > > > > > > > > > > backlog mode? It sounds like not a complete FLIP
> to
> > >> me.
> > >> > Is
> > >> > > it
> > >> > > > > > > > possible
> > >> > > > > > > > > to
> > >> > > > > > > > > > > support switching back in this FLIP?
> > >> > > > > > > > > >
> > >> > > > > > > > > > I think the description in the FLIP actually means
> the
> > >> > other
> > >> > > > way
> > >> > > > > > > > > > around, where the job can never switch back to batch
> > >> mode
> > >> > > once
> > >> > > > it
> > >> > > > > > has
> > >> > > > > > > > > > switched into streaming mode. This is to align with
> > the
> > >> > > current
> > >> > > > > > state
> > >> > > > > > > > > > of FLIP-327[1], where only switching from batch to
> > >> stream
> > >> > > mode
> > >> > > > is
> > >> > > > > > > > > > supported.
> > >> > > > > > > > > >
> > >> > > > > > > > > > @Leonard
> > >> > > > > > > > > >
> > >> > > > > > > > > > > > The FLIP describe that: And it should report
> > >> > > > > > > > > isProcessingBacklog=false
> > >> > > > > > > > > > at the beginning of the snapshot stage.
> > >> > > > > > > > > > > This should be “changelog stage”
> > >> > > > > > > > > >
> > >> > > > > > > > > > I think the description is in FLIP-309. Thanks for
> > >> pointing
> > >> > > > out.
> > >> > > > > I
> > >> > > > > > > > > > updated the description.
> > >> > > > > > > > > >
> > >> > > > > > > > > > > I'm not sure if it's enough to support this
> feature
> > >> only
> > >> > in
> > >> > > > > > FLIP-27
> > >> > > > > > > > > > Source. Although we are pushing the sourceFunction
> API
> > >> to
> > >> > be
> > >> > > > > > removed,
> > >> > > > > > > > > these
> > >> > > > > > > > > > APIs will be survive one or two versions in flink
> repo
> > >> > before
> > >> > > > > they
> > >> > > > > > > are
> > >> > > > > > > > > > actually removed.
> > >> > > > > > > > > >
> > >> > > > > > > > > > I agree that it is good to support the
> SourceFunction
> > >> API.
> > >> > > > > However,
> > >> > > > > > > > > > given that the SourceFunction API is marked as
> > >> deprecated,
> > >> > I
> > >> > > > > think
> > >> > > > > > I
> > >> > > > > > > > > > will prioritize supporting the FLIP-27 Source. We
> can
> > >> > support
> > >> > > > the
> > >> > > > > > > > > > SourceFunction API after the
> > >> > > > > > > > > > FLIP-27 source. What do you think?
> > >> > > > > > > > > >
> > >> > > > > > > > > > Best regards,
> > >> > > > > > > > > > Xuannan
> > >> > > > > > > > > >
> > >> > > > > > > > > > [1]
> > >> > > > > > > > > >
> > >> > > > > > > > >
> > >> > > > > > > >
> > >> > > > > > >
> > >> > > > > >
> > >> > > > >
> > >> > > >
> > >> > >
> > >> >
> > >>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-327%3A+Support+switching+from+batch+to+stream+mode+to+improve+throughput+when+processing+backlog+data
> > >> > > > > > > > > >
> > >> > > > > > > > > >
> > >> > > > > > > > > >
> > >> > > > > > > > > >
> > >> > > > > > > > > > On Fri, Sep 8, 2023 at 1:02 AM Leonard Xu <
> > >> > xbjt...@gmail.com
> > >> > > >
> > >> > > > > > wrote:
> > >> > > > > > > > > > >
> > >> > > > > > > > > > > Thanks Xuannan for driving this FLIP !
> > >> > > > > > > > > > >
> > >> > > > > > > > > > > The proposal generally looks good to me, but I
> still
> > >> left
> > >> > > > some
> > >> > > > > > > > > comments:
> > >> > > > > > > > > > >
> > >> > > > > > > > > > > > One more question about the FLIP is that the
> FLIP
> > >> says
> > >> > > > "Note
> > >> > > > > > that
> > >> > > > > > > > > this
> > >> > > > > > > > > > > > config does not support switching source's
> > >> > > > > isProcessingBacklog
> > >> > > > > > > from
> > >> > > > > > > > > > false to true
> > >> > > > > > > > > > > > for now.” Does that mean the job can never back
> to
> > >> > > > streaming
> > >> > > > > > mode
> > >> > > > > > > > > once
> > >> > > > > > > > > > switches into
> > >> > > > > > > > > > > > backlog mode? It sounds like not a complete FLIP
> > to
> > >> me.
> > >> > > Is
> > >> > > > it
> > >> > > > > > > > > possible
> > >> > > > > > > > > > to
> > >> > > > > > > > > > > > support switching back in this FLIP?
> > >> > > > > > > > > > > +1 for Jark’s concern, IIUC, the state transition
> of
> > >> > > > > > > > > IsProcessingBacklog
> > >> > > > > > > > > > depends on whether the data in the source is
> > processing
> > >> > > backlog
> > >> > > > > > data
> > >> > > > > > > or
> > >> > > > > > > > > > not. Different sources will have different backlog
> > >> status
> > >> > and
> > >> > > > > which
> > >> > > > > > > may
> > >> > > > > > > > > > change over time. From a general perspective, we
> > should
> > >> not
> > >> > > > have
> > >> > > > > > this
> > >> > > > > > > > > > restriction.
> > >> > > > > > > > > > >
> > >> > > > > > > > > > > > The FLIP describe that: And it should report
> > >> > > > > > > > > isProcessingBacklog=false
> > >> > > > > > > > > > at the beginning of the snapshot stage.
> > >> > > > > > > > > > > This should be “changelog stage”
> > >> > > > > > > > > > >
> > >> > > > > > > > > > > I'm not sure if it's enough to support this
> feature
> > >> only
> > >> > in
> > >> > > > > > FLIP-27
> > >> > > > > > > > > > Source. Although we are pushing the sourceFunction
> API
> > >> to
> > >> > be
> > >> > > > > > removed,
> > >> > > > > > > > > these
> > >> > > > > > > > > > APIs will be survive one or two versions in flink
> repo
> > >> > before
> > >> > > > > they
> > >> > > > > > > are
> > >> > > > > > > > > > actually removed.
> > >> > > > > > > > > > >
> > >> > > > > > > > > > > Best,
> > >> > > > > > > > > > > Leonard
> > >> > > > > > > > > > >
> > >> > > > > > > > > > > >
> > >> > > > > > > > > > > > Best,
> > >> > > > > > > > > > > > Jark
> > >> > > > > > > > > > > >
> > >> > > > > > > > > > > >
> > >> > > > > > > > > > > > On Thu, 7 Sept 2023 at 13:51, Xuannan Su <
> > >> > > > > > suxuanna...@gmail.com>
> > >> > > > > > > > > > wrote:
> > >> > > > > > > > > > > >
> > >> > > > > > > > > > > >> Hi all,
> > >> > > > > > > > > > > >>
> > >> > > > > > > > > > > >> Thank you for all the reviews and suggestions.
> > >> > > > > > > > > > > >>
> > >> > > > > > > > > > > >> I believe all the comments have been addressed.
> > If
> > >> > there
> > >> > > > are
> > >> > > > > > no
> > >> > > > > > > > > > > >> further comments, I plan to open the voting
> > thread
> > >> for
> > >> > > > this
> > >> > > > > > FLIP
> > >> > > > > > > > > early
> > >> > > > > > > > > > > >> next week.
> > >> > > > > > > > > > > >>
> > >> > > > > > > > > > > >> Best regards,
> > >> > > > > > > > > > > >> Xuannan
> > >> > > > > > > > > > > >>
> > >> > > > > > > > > > > >> On Thu, Sep 7, 2023 at 12:09 AM Jing Ge
> > >> > > > > > > > <j...@ververica.com.invalid
> > >> > > > > > > > > >
> > >> > > > > > > > > > > >> wrote:
> > >> > > > > > > > > > > >>>
> > >> > > > > > > > > > > >>> Hi Xuannan,
> > >> > > > > > > > > > > >>>
> > >> > > > > > > > > > > >>> I thought FLIP-328 will compete with FLIP-309
> > >> while
> > >> > > > setting
> > >> > > > > > the
> > >> > > > > > > > > > value of
> > >> > > > > > > > > > > >>> the backlog. Understood. Thanks for the hint.
> > >> > > > > > > > > > > >>>
> > >> > > > > > > > > > > >>> Best regards,
> > >> > > > > > > > > > > >>> Jing
> > >> > > > > > > > > > > >>>
> > >> > > > > > > > > > > >>> On Wed, Sep 6, 2023 at 12:12 PM Xuannan Su <
> > >> > > > > > > > suxuanna...@gmail.com>
> > >> > > > > > > > > > > >> wrote:
> > >> > > > > > > > > > > >>>
> > >> > > > > > > > > > > >>>> Hi Jing,
> > >> > > > > > > > > > > >>>>
> > >> > > > > > > > > > > >>>> Thank you for the clarification.
> > >> > > > > > > > > > > >>>>
> > >> > > > > > > > > > > >>>> For the use case you mentioned, I believe we
> > can
> > >> > > utilize
> > >> > > > > the
> > >> > > > > > > > > > > >>>> HybridSource, as updated in FLIP-309[1], to
> > >> > determine
> > >> > > > the
> > >> > > > > > > > backlog
> > >> > > > > > > > > > > >>>> status. For example, if the user wants to
> > process
> > >> > data
> > >> > > > > > before
> > >> > > > > > > > > time T
> > >> > > > > > > > > > > >>>> in batch mode and after time T in stream
> mode,
> > >> they
> > >> > > can
> > >> > > > > set
> > >> > > > > > > the
> > >> > > > > > > > > > first
> > >> > > > > > > > > > > >>>> source of the HybridSource to read up to
> time T
> > >> and
> > >> > > the
> > >> > > > > last
> > >> > > > > > > > > source
> > >> > > > > > > > > > of
> > >> > > > > > > > > > > >>>> the HybridSource to read from time T.
> > >> > > > > > > > > > > >>>>
> > >> > > > > > > > > > > >>>> Best,
> > >> > > > > > > > > > > >>>> Xuannan
> > >> > > > > > > > > > > >>>>
> > >> > > > > > > > > > > >>>> [1]
> > >> > > > > > > > > > > >>>>
> > >> > > > > > > > > > > >>
> > >> > > > > > > > > >
> > >> > > > > > > > >
> > >> > > > > > > >
> > >> > > > > > >
> > >> > > > > >
> > >> > > > >
> > >> > > >
> > >> > >
> > >> >
> > >>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-309%3A+Support+using+larger+checkpointing+interval+when+source+is+processing+backlog
> > >> > > > > > > > > > > >>>>
> > >> > > > > > > > > > > >>>>
> > >> > > > > > > > > > > >>>> On Mon, Sep 4, 2023 at 10:36 PM Jing Ge
> > >> > > > > > > > > <j...@ververica.com.invalid
> > >> > > > > > > > > > >
> > >> > > > > > > > > > > >>>> wrote:
> > >> > > > > > > > > > > >>>>>
> > >> > > > > > > > > > > >>>>> Hi Xuannan,
> > >> > > > > > > > > > > >>>>>
> > >> > > > > > > > > > > >>>>> Thanks for the clarification.
> > >> > > > > > > > > > > >>>>>
> > >> > > > > > > > > > > >>>>> 3. Event time and process time are two
> > different
> > >> > > > things.
> > >> > > > > It
> > >> > > > > > > > might
> > >> > > > > > > > > > be
> > >> > > > > > > > > > > >>>> rarely
> > >> > > > > > > > > > > >>>>> used, but conceptually, users can process
> data
> > >> in
> > >> > the
> > >> > > > > past
> > >> > > > > > > > > within a
> > >> > > > > > > > > > > >>>>> specific time range in the streaming mode.
> All
> > >> data
> > >> > > > > before
> > >> > > > > > > that
> > >> > > > > > > > > > range
> > >> > > > > > > > > > > >>>> will
> > >> > > > > > > > > > > >>>>> be considered as backlog and needed to be
> > >> processed
> > >> > > in
> > >> > > > > the
> > >> > > > > > > > batch
> > >> > > > > > > > > > > >> mode,
> > >> > > > > > > > > > > >>>>> like, e.g. the Present Perfect Progressive
> > tense
> > >> > used
> > >> > > > in
> > >> > > > > > > > English
> > >> > > > > > > > > > > >>>> language.
> > >> > > > > > > > > > > >>>>>
> > >> > > > > > > > > > > >>>>> Best regards,
> > >> > > > > > > > > > > >>>>> Jing
> > >> > > > > > > > > > > >>>>>
> > >> > > > > > > > > > > >>>>> On Thu, Aug 31, 2023 at 4:45 AM Xuannan Su <
> > >> > > > > > > > > suxuanna...@gmail.com>
> > >> > > > > > > > > > > >>>> wrote:
> > >> > > > > > > > > > > >>>>>
> > >> > > > > > > > > > > >>>>>> Hi Jing,
> > >> > > > > > > > > > > >>>>>>
> > >> > > > > > > > > > > >>>>>> Thanks for the reply.
> > >> > > > > > > > > > > >>>>>>
> > >> > > > > > > > > > > >>>>>> 1. You are absolutely right that the
> > watermark
> > >> lag
> > >> > > > > > threshold
> > >> > > > > > > > > must
> > >> > > > > > > > > > > >> be
> > >> > > > > > > > > > > >>>>>> carefully set with a thorough understanding
> > of
> > >> > > > watermark
> > >> > > > > > > > > > > >> generation.
> > >> > > > > > > > > > > >>>> It is
> > >> > > > > > > > > > > >>>>>> crucial for users to take into account the
> > >> > > > > > WatermarkStrategy
> > >> > > > > > > > > when
> > >> > > > > > > > > > > >>>> setting
> > >> > > > > > > > > > > >>>>>> the watermark lag threshold.
> > >> > > > > > > > > > > >>>>>>
> > >> > > > > > > > > > > >>>>>> 2. Regarding pure processing-time based
> > stream
> > >> > > > > processing
> > >> > > > > > > > jobs,
> > >> > > > > > > > > > > >>>>>> alternative strategies will be implemented
> to
> > >> > > > determine
> > >> > > > > > > > whether
> > >> > > > > > > > > > the
> > >> > > > > > > > > > > >>>> job is
> > >> > > > > > > > > > > >>>>>> processing backlog data. I have outlined
> two
> > >> > > possible
> > >> > > > > > > > strategies
> > >> > > > > > > > > > > >> below:
> > >> > > > > > > > > > > >>>>>>
> > >> > > > > > > > > > > >>>>>> - Based on the source operator's state. For
> > >> > example,
> > >> > > > > when
> > >> > > > > > > > MySQL
> > >> > > > > > > > > > CDC
> > >> > > > > > > > > > > >>>> source
> > >> > > > > > > > > > > >>>>>> is reading snapshot, it can claim
> > >> isBacklog=true.
> > >> > > > > > > > > > > >>>>>> - Based on metrics. For example, when
> > >> > > > > busyTimeMsPerSecond
> > >> > > > > > > (or
> > >> > > > > > > > > > > >>>>>> backPressuredTimeMsPerSecond) >
> > >> > > > > user_specified_threshold,
> > >> > > > > > > then
> > >> > > > > > > > > > > >>>>>> isBacklog=true.
> > >> > > > > > > > > > > >>>>>>
> > >> > > > > > > > > > > >>>>>> As of the strategies proposed in this FLIP,
> > it
> > >> > rely
> > >> > > on
> > >> > > > > > > > generated
> > >> > > > > > > > > > > >>>>>> watermarks. Therefore, if a user intends
> for
> > >> the
> > >> > job
> > >> > > > to
> > >> > > > > > > detect
> > >> > > > > > > > > > > >> backlog
> > >> > > > > > > > > > > >>>>>> status based on watermark, it is necessary
> to
> > >> > > generate
> > >> > > > > the
> > >> > > > > > > > > > > >> watermark.
> > >> > > > > > > > > > > >>>>>>
> > >> > > > > > > > > > > >>>>>> 3. I'm afraid I'm not fully grasping your
> > >> > question.
> > >> > > > From
> > >> > > > > > my
> > >> > > > > > > > > > > >>>> understanding,
> > >> > > > > > > > > > > >>>>>> it should work in both cases. When event
> > times
> > >> are
> > >> > > > close
> > >> > > > > > to
> > >> > > > > > > > the
> > >> > > > > > > > > > > >>>> processing
> > >> > > > > > > > > > > >>>>>> time, resulting in watermarks close to the
> > >> > > processing
> > >> > > > > > time,
> > >> > > > > > > > the
> > >> > > > > > > > > > > >> job is
> > >> > > > > > > > > > > >>>> not
> > >> > > > > > > > > > > >>>>>> processing backlog data. On the other hand,
> > >> when
> > >> > > event
> > >> > > > > > times
> > >> > > > > > > > are
> > >> > > > > > > > > > > >> far
> > >> > > > > > > > > > > >>>> from
> > >> > > > > > > > > > > >>>>>> processing time, causing watermarks to also
> > be
> > >> > > > distant,
> > >> > > > > if
> > >> > > > > > > the
> > >> > > > > > > > > lag
> > >> > > > > > > > > > > >>>>>> surpasses the defined threshold, the job is
> > >> > > considered
> > >> > > > > > > > > processing
> > >> > > > > > > > > > > >>>> backlog
> > >> > > > > > > > > > > >>>>>> data.
> > >> > > > > > > > > > > >>>>>>
> > >> > > > > > > > > > > >>>>>> Best,
> > >> > > > > > > > > > > >>>>>> Xuannan
> > >> > > > > > > > > > > >>>>>>
> > >> > > > > > > > > > > >>>>>>
> > >> > > > > > > > > > > >>>>>>> On Aug 31, 2023, at 02:56, Jing Ge
> > >> > > > > > > > <j...@ververica.com.INVALID
> > >> > > > > > > > > >
> > >> > > > > > > > > > > >>>> wrote:
> > >> > > > > > > > > > > >>>>>>>
> > >> > > > > > > > > > > >>>>>>> Hi Xuannan,
> > >> > > > > > > > > > > >>>>>>>
> > >> > > > > > > > > > > >>>>>>> Thanks for the clarification. That is the
> > part
> > >> > > where
> > >> > > > I
> > >> > > > > am
> > >> > > > > > > > > trying
> > >> > > > > > > > > > > >> to
> > >> > > > > > > > > > > >>>>>>> understand your thoughts. I have some
> > >> follow-up
> > >> > > > > > questions:
> > >> > > > > > > > > > > >>>>>>>
> > >> > > > > > > > > > > >>>>>>> 1. It depends strongly on the
> > >> watermarkStrategy
> > >> > and
> > >> > > > how
> > >> > > > > > > > > > > >> customized
> > >> > > > > > > > > > > >>>>>>> watermark generation looks like. It mixes
> > >> > business
> > >> > > > > logic
> > >> > > > > > > with
> > >> > > > > > > > > > > >>>> technical
> > >> > > > > > > > > > > >>>>>>> implementation and technical data
> processing
> > >> > mode.
> > >> > > > The
> > >> > > > > > > value
> > >> > > > > > > > of
> > >> > > > > > > > > > > >> the
> > >> > > > > > > > > > > >>>>>>> watermark lag threshold must be set very
> > >> > carefully.
> > >> > > > If
> > >> > > > > > the
> > >> > > > > > > > > value
> > >> > > > > > > > > > > >> is
> > >> > > > > > > > > > > >>>> too
> > >> > > > > > > > > > > >>>>>>> small. any time, when the watermark
> > generation
> > >> > > logic
> > >> > > > is
> > >> > > > > > > > > > > >>>> changed(business
> > >> > > > > > > > > > > >>>>>>> logic changes lead to the threshold
> getting
> > >> > > > exceeded),
> > >> > > > > > the
> > >> > > > > > > > same
> > >> > > > > > > > > > > >> job
> > >> > > > > > > > > > > >>>> might
> > >> > > > > > > > > > > >>>>>>> be running surprisingly in backlog
> > processing
> > >> > mode,
> > >> > > > > i.e.
> > >> > > > > > a
> > >> > > > > > > > > > > >> butterfly
> > >> > > > > > > > > > > >>>>>>> effect. A comprehensive documentation is
> > >> required
> > >> > > to
> > >> > > > > > avoid
> > >> > > > > > > > any
> > >> > > > > > > > > > > >>>> confusion
> > >> > > > > > > > > > > >>>>>>> for the users.
> > >> > > > > > > > > > > >>>>>>> 2. Like Jark already mentioned, use cases
> > >> that do
> > >> > > not
> > >> > > > > > have
> > >> > > > > > > > > > > >>>> watermarks,
> > >> > > > > > > > > > > >>>>>>> like pure processing-time based stream
> > >> > > processing[1]
> > >> > > > > are
> > >> > > > > > > not
> > >> > > > > > > > > > > >>>> covered. It
> > >> > > > > > > > > > > >>>>>> is
> > >> > > > > > > > > > > >>>>>>> more or less a trade-off solution that
> does
> > >> not
> > >> > > > support
> > >> > > > > > > such
> > >> > > > > > > > > use
> > >> > > > > > > > > > > >>>> cases
> > >> > > > > > > > > > > >>>>>> and
> > >> > > > > > > > > > > >>>>>>> appropriate documentation is required.
> > Forcing
> > >> > them
> > >> > > > to
> > >> > > > > > > > > explicitly
> > >> > > > > > > > > > > >>>>>> generate
> > >> > > > > > > > > > > >>>>>>> watermarks that are never needed just
> > because
> > >> of
> > >> > > this
> > >> > > > > > does
> > >> > > > > > > > not
> > >> > > > > > > > > > > >> sound
> > >> > > > > > > > > > > >>>>>> like a
> > >> > > > > > > > > > > >>>>>>> proper solution.
> > >> > > > > > > > > > > >>>>>>> 3. If I am not mistaken, it only works for
> > use
> > >> > > cases
> > >> > > > > > where
> > >> > > > > > > > > event
> > >> > > > > > > > > > > >>>> times
> > >> > > > > > > > > > > >>>>>> are
> > >> > > > > > > > > > > >>>>>>> very close to the processing times,
> because
> > >> the
> > >> > > wall
> > >> > > > > > clock
> > >> > > > > > > is
> > >> > > > > > > > > > > >> used to
> > >> > > > > > > > > > > >>>>>>> calculate the watermark lag and the
> > watermark
> > >> is
> > >> > > > > > generated
> > >> > > > > > > > > based
> > >> > > > > > > > > > > >> on
> > >> > > > > > > > > > > >>>> the
> > >> > > > > > > > > > > >>>>>>> event time.
> > >> > > > > > > > > > > >>>>>>>
> > >> > > > > > > > > > > >>>>>>> Best regards,
> > >> > > > > > > > > > > >>>>>>> Jing
> > >> > > > > > > > > > > >>>>>>>
> > >> > > > > > > > > > > >>>>>>> [1]
> > >> > > > > > > > > > > >>>>>>>
> > >> > > > > > > > > > > >>>>>>
> > >> > > > > > > > > > > >>>>
> > >> > > > > > > > > > > >>
> > >> > > > > > > > > >
> > >> > > > > > > > >
> > >> > > > > > > >
> > >> > > > > > >
> > >> > > > > >
> > >> > > > >
> > >> > > >
> > >> > >
> > >> >
> > >>
> >
> https://github.com/apache/flink/blob/2c50b4e956305426f478b726d4de4a640a16b810/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkStrategy.java#L236
> > >> > > > > > > > > > > >>>>>>>
> > >> > > > > > > > > > > >>>>>>> On Wed, Aug 30, 2023 at 4:06 AM Xuannan
> Su <
> > >> > > > > > > > > > > >> suxuanna...@gmail.com>
> > >> > > > > > > > > > > >>>>>> wrote:
> > >> > > > > > > > > > > >>>>>>>
> > >> > > > > > > > > > > >>>>>>>> Hi Jing,
> > >> > > > > > > > > > > >>>>>>>>
> > >> > > > > > > > > > > >>>>>>>> Thank you for the suggestion.
> > >> > > > > > > > > > > >>>>>>>>
> > >> > > > > > > > > > > >>>>>>>> The definition of watermark lag is the
> same
> > >> as
> > >> > the
> > >> > > > > > > > > watermarkLag
> > >> > > > > > > > > > > >>>> metric
> > >> > > > > > > > > > > >>>>>> in
> > >> > > > > > > > > > > >>>>>>>> FLIP-33[1]. More specifically, the
> > watermark
> > >> lag
> > >> > > > > > > calculation
> > >> > > > > > > > > is
> > >> > > > > > > > > > > >>>>>> computed at
> > >> > > > > > > > > > > >>>>>>>> the time when a watermark is emitted
> > >> downstream
> > >> > in
> > >> > > > the
> > >> > > > > > > > > following
> > >> > > > > > > > > > > >>>> way:
> > >> > > > > > > > > > > >>>>>>>> watermarkLag = CurrentTime - Watermark. I
> > >> have
> > >> > > added
> > >> > > > > > this
> > >> > > > > > > > > > > >>>> description to
> > >> > > > > > > > > > > >>>>>>>> the FLIP.
> > >> > > > > > > > > > > >>>>>>>>
> > >> > > > > > > > > > > >>>>>>>> I hope this addresses your concern.
> > >> > > > > > > > > > > >>>>>>>>
> > >> > > > > > > > > > > >>>>>>>> Best,
> > >> > > > > > > > > > > >>>>>>>> Xuannan
> > >> > > > > > > > > > > >>>>>>>>
> > >> > > > > > > > > > > >>>>>>>> [1]
> > >> > > > > > > > > > > >>>>>>>>
> > >> > > > > > > > > > > >>>>>>
> > >> > > > > > > > > > > >>>>
> > >> > > > > > > > > > > >>
> > >> > > > > > > > > >
> > >> > > > > > > > >
> > >> > > > > > > >
> > >> > > > > > >
> > >> > > > > >
> > >> > > > >
> > >> > > >
> > >> > >
> > >> >
> > >>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics
> > >> > > > > > > > > > > >>>>>>>>
> > >> > > > > > > > > > > >>>>>>>>
> > >> > > > > > > > > > > >>>>>>>>> On Aug 28, 2023, at 01:04, Jing Ge
> > >> > > > > > > > > <j...@ververica.com.INVALID
> > >> > > > > > > > > > > >>>
> > >> > > > > > > > > > > >>>> wrote:
> > >> > > > > > > > > > > >>>>>>>>>
> > >> > > > > > > > > > > >>>>>>>>> Hi Xuannan,
> > >> > > > > > > > > > > >>>>>>>>>
> > >> > > > > > > > > > > >>>>>>>>> Thanks for the proposal. +1 for me.
> > >> > > > > > > > > > > >>>>>>>>>
> > >> > > > > > > > > > > >>>>>>>>> There is one tiny thing that I am not
> sure
> > >> if I
> > >> > > > > > > understand
> > >> > > > > > > > it
> > >> > > > > > > > > > > >>>>>> correctly.
> > >> > > > > > > > > > > >>>>>>>>> Since there will be many different
> > >> > > > > WatermarkStrategies
> > >> > > > > > > and
> > >> > > > > > > > > > > >>>> different
> > >> > > > > > > > > > > >>>>>>>>> WatermarkGenerators. Could you please
> > update
> > >> > the
> > >> > > > FLIP
> > >> > > > > > and
> > >> > > > > > > > add
> > >> > > > > > > > > > > >> the
> > >> > > > > > > > > > > >>>>>>>>> description of how the watermark lag is
> > >> > > calculated
> > >> > > > > > > exactly?
> > >> > > > > > > > > > > >> E.g.
> > >> > > > > > > > > > > >>>>>>>> Watermark
> > >> > > > > > > > > > > >>>>>>>>> lag = A - B with A is the timestamp of
> the
> > >> > > > watermark
> > >> > > > > > > > emitted
> > >> > > > > > > > > > > >> to the
> > >> > > > > > > > > > > >>>>>>>>> downstream and B is....(this is the
> part I
> > >> am
> > >> > not
> > >> > > > > > really
> > >> > > > > > > > sure
> > >> > > > > > > > > > > >> after
> > >> > > > > > > > > > > >>>>>>>> reading
> > >> > > > > > > > > > > >>>>>>>>> the FLIP).
> > >> > > > > > > > > > > >>>>>>>>>
> > >> > > > > > > > > > > >>>>>>>>> Best regards,
> > >> > > > > > > > > > > >>>>>>>>> Jing
> > >> > > > > > > > > > > >>>>>>>>>
> > >> > > > > > > > > > > >>>>>>>>>
> > >> > > > > > > > > > > >>>>>>>>> On Mon, Aug 21, 2023 at 9:03 AM Xuannan
> > Su <
> > >> > > > > > > > > > > >> suxuanna...@gmail.com>
> > >> > > > > > > > > > > >>>>>>>> wrote:
> > >> > > > > > > > > > > >>>>>>>>>
> > >> > > > > > > > > > > >>>>>>>>>> Hi Jark,
> > >> > > > > > > > > > > >>>>>>>>>>
> > >> > > > > > > > > > > >>>>>>>>>> Thanks for the comments.
> > >> > > > > > > > > > > >>>>>>>>>>
> > >> > > > > > > > > > > >>>>>>>>>> I agree that the current solution
> cannot
> > >> > support
> > >> > > > > jobs
> > >> > > > > > > that
> > >> > > > > > > > > > > >> cannot
> > >> > > > > > > > > > > >>>>>> define
> > >> > > > > > > > > > > >>>>>>>>>> watermarks. However, after considering
> > the
> > >> > > > > > > > > > > >> pending-record-based
> > >> > > > > > > > > > > >>>>>>>> solution, I
> > >> > > > > > > > > > > >>>>>>>>>> believe the current solution is
> superior
> > >> for
> > >> > the
> > >> > > > > > target
> > >> > > > > > > > use
> > >> > > > > > > > > > > >> case
> > >> > > > > > > > > > > >>>> as it
> > >> > > > > > > > > > > >>>>>>>> is
> > >> > > > > > > > > > > >>>>>>>>>> more intuitive for users. The backlog
> > >> status
> > >> > > gives
> > >> > > > > > users
> > >> > > > > > > > the
> > >> > > > > > > > > > > >>>> ability
> > >> > > > > > > > > > > >>>>>> to
> > >> > > > > > > > > > > >>>>>>>>>> balance between throughput and latency.
> > >> Making
> > >> > > > this
> > >> > > > > > > > > trade-off
> > >> > > > > > > > > > > >>>> decision
> > >> > > > > > > > > > > >>>>>>>>>> based on the watermark lag is more
> > >> intuitive
> > >> > > from
> > >> > > > > the
> > >> > > > > > > > user's
> > >> > > > > > > > > > > >>>>>>>> perspective.
> > >> > > > > > > > > > > >>>>>>>>>> For instance, a user can decide that if
> > the
> > >> > job
> > >> > > > lags
> > >> > > > > > > > behind
> > >> > > > > > > > > > > >> the
> > >> > > > > > > > > > > >>>>>> current
> > >> > > > > > > > > > > >>>>>>>>>> time by more than 1 hour, the result is
> > not
> > >> > > > usable.
> > >> > > > > In
> > >> > > > > > > > that
> > >> > > > > > > > > > > >> case,
> > >> > > > > > > > > > > >>>> we
> > >> > > > > > > > > > > >>>>>> can
> > >> > > > > > > > > > > >>>>>>>>>> optimize for throughput when the data
> > lags
> > >> > > behind
> > >> > > > by
> > >> > > > > > > more
> > >> > > > > > > > > > > >> than an
> > >> > > > > > > > > > > >>>>>> hour.
> > >> > > > > > > > > > > >>>>>>>>>> With the pending-record-based solution,
> > >> it's
> > >> > > > > > challenging
> > >> > > > > > > > for
> > >> > > > > > > > > > > >>>> users to
> > >> > > > > > > > > > > >>>>>>>>>> determine when to optimize for
> throughput
> > >> and
> > >> > > when
> > >> > > > > to
> > >> > > > > > > > > > > >> prioritize
> > >> > > > > > > > > > > >>>>>>>> latency.
> > >> > > > > > > > > > > >>>>>>>>>>
> > >> > > > > > > > > > > >>>>>>>>>> Regarding the limitations of the
> > >> > watermark-based
> > >> > > > > > > solution:
> > >> > > > > > > > > > > >>>>>>>>>>
> > >> > > > > > > > > > > >>>>>>>>>> 1. The current solution can support
> jobs
> > >> with
> > >> > > > > sources
> > >> > > > > > > that
> > >> > > > > > > > > > > >> have
> > >> > > > > > > > > > > >>>> event
> > >> > > > > > > > > > > >>>>>>>>>> time. Users can always define a
> watermark
> > >> at
> > >> > the
> > >> > > > > > source
> > >> > > > > > > > > > > >> operator,
> > >> > > > > > > > > > > >>>> even
> > >> > > > > > > > > > > >>>>>>>> if
> > >> > > > > > > > > > > >>>>>>>>>> it's not used by downstream operators,
> > >> such as
> > >> > > > > > streaming
> > >> > > > > > > > > join
> > >> > > > > > > > > > > >> and
> > >> > > > > > > > > > > >>>>>>>> unbounded
> > >> > > > > > > > > > > >>>>>>>>>> aggregate.
> > >> > > > > > > > > > > >>>>>>>>>>
> > >> > > > > > > > > > > >>>>>>>>>> 2.I don't believe it's accurate to say
> > that
> > >> > the
> > >> > > > > > > watermark
> > >> > > > > > > > > lag
> > >> > > > > > > > > > > >> will
> > >> > > > > > > > > > > >>>>>> keep
> > >> > > > > > > > > > > >>>>>>>>>> increasing if no data is generated in
> > >> Kafka.
> > >> > The
> > >> > > > > > > watermark
> > >> > > > > > > > > > > >> lag and
> > >> > > > > > > > > > > >>>>>>>> backlog
> > >> > > > > > > > > > > >>>>>>>>>> status are determined at the moment
> when
> > >> the
> > >> > > > > watermark
> > >> > > > > > > is
> > >> > > > > > > > > > > >> emitted
> > >> > > > > > > > > > > >>>> to
> > >> > > > > > > > > > > >>>>>> the
> > >> > > > > > > > > > > >>>>>>>>>> downstream operator. If no data is
> > emitted
> > >> > from
> > >> > > > the
> > >> > > > > > > > source,
> > >> > > > > > > > > > > >> the
> > >> > > > > > > > > > > >>>>>>>> watermark
> > >> > > > > > > > > > > >>>>>>>>>> lag and backlog status will not be
> > >> updated. If
> > >> > > the
> > >> > > > > > > > > > > >>>> WatermarkStrategy
> > >> > > > > > > > > > > >>>>>>>> with
> > >> > > > > > > > > > > >>>>>>>>>> idleness is used, the source becomes
> > >> > non-backlog
> > >> > > > > when
> > >> > > > > > it
> > >> > > > > > > > > > > >> becomes
> > >> > > > > > > > > > > >>>> idle.
> > >> > > > > > > > > > > >>>>>>>>>>
> > >> > > > > > > > > > > >>>>>>>>>> 3. I think watermark lag is more
> > intuitive
> > >> to
> > >> > > > > > determine
> > >> > > > > > > > if a
> > >> > > > > > > > > > > >> job
> > >> > > > > > > > > > > >>>> is
> > >> > > > > > > > > > > >>>>>>>>>> processing backlog data. Even when
> using
> > >> > pending
> > >> > > > > > > records,
> > >> > > > > > > > it
> > >> > > > > > > > > > > >>>> faces a
> > >> > > > > > > > > > > >>>>>>>>>> similar issue. For example, if the
> source
> > >> has
> > >> > 1K
> > >> > > > > > pending
> > >> > > > > > > > > > > >> records,
> > >> > > > > > > > > > > >>>>>> those
> > >> > > > > > > > > > > >>>>>>>>>> records can span from 1 day  to 1 hour
> > to 1
> > >> > > > second.
> > >> > > > > If
> > >> > > > > > > the
> > >> > > > > > > > > > > >> records
> > >> > > > > > > > > > > >>>>>> span
> > >> > > > > > > > > > > >>>>>>>> 1
> > >> > > > > > > > > > > >>>>>>>>>> day, it's probably best to optimize for
> > >> > > > throughput.
> > >> > > > > If
> > >> > > > > > > > they
> > >> > > > > > > > > > > >> span 1
> > >> > > > > > > > > > > >>>>>>>> hour, it
> > >> > > > > > > > > > > >>>>>>>>>> depends on the business logic. If they
> > >> span 1
> > >> > > > > second,
> > >> > > > > > > > > > > >> optimizing
> > >> > > > > > > > > > > >>>> for
> > >> > > > > > > > > > > >>>>>>>>>> latency is likely the better choice.
> > >> > > > > > > > > > > >>>>>>>>>>
> > >> > > > > > > > > > > >>>>>>>>>> In summary, I believe the
> watermark-based
> > >> > > solution
> > >> > > > > is
> > >> > > > > > a
> > >> > > > > > > > > > > >> superior
> > >> > > > > > > > > > > >>>>>> choice
> > >> > > > > > > > > > > >>>>>>>>>> for the target use case where
> > >> watermark/event
> > >> > > time
> > >> > > > > can
> > >> > > > > > > be
> > >> > > > > > > > > > > >> defined.
> > >> > > > > > > > > > > >>>>>>>>>> Additionally, I haven't come across a
> > >> scenario
> > >> > > > that
> > >> > > > > > > > requires
> > >> > > > > > > > > > > >>>>>> low-latency
> > >> > > > > > > > > > > >>>>>>>>>> processing and reads from a source that
> > >> cannot
> > >> > > > > define
> > >> > > > > > > > > > > >> watermarks.
> > >> > > > > > > > > > > >>>> If
> > >> > > > > > > > > > > >>>>>> we
> > >> > > > > > > > > > > >>>>>>>>>> encounter such a use case, we can
> create
> > >> > another
> > >> > > > > FLIP
> > >> > > > > > to
> > >> > > > > > > > > > > >> address
> > >> > > > > > > > > > > >>>> those
> > >> > > > > > > > > > > >>>>>>>>>> needs in the future. What do you think?
> > >> > > > > > > > > > > >>>>>>>>>>
> > >> > > > > > > > > > > >>>>>>>>>>
> > >> > > > > > > > > > > >>>>>>>>>> Best,
> > >> > > > > > > > > > > >>>>>>>>>> Xuannan
> > >> > > > > > > > > > > >>>>>>>>>>
> > >> > > > > > > > > > > >>>>>>>>>>
> > >> > > > > > > > > > > >>>>>>>>>>
> > >> > > > > > > > > > > >>>>>>>>>>> On Aug 20, 2023, at 23:27, Jark Wu <
> > >> > > > > imj...@gmail.com
> > >> > > > > > > > > > > >> <mailto:
> > >> > > > > > > > > > > >>>>>>>>>> imj...@gmail.com>> wrote:
> > >> > > > > > > > > > > >>>>>>>>>>>
> > >> > > > > > > > > > > >>>>>>>>>>> Hi Xuannan,
> > >> > > > > > > > > > > >>>>>>>>>>>
> > >> > > > > > > > > > > >>>>>>>>>>> Thanks for opening this discussion.
> > >> > > > > > > > > > > >>>>>>>>>>>
> > >> > > > > > > > > > > >>>>>>>>>>> This current proposal may work in the
> > >> > mentioned
> > >> > > > > > > watermark
> > >> > > > > > > > > > > >> cases.
> > >> > > > > > > > > > > >>>>>>>>>>> However, it seems this is not a
> general
> > >> > > solution
> > >> > > > > for
> > >> > > > > > > > > sources
> > >> > > > > > > > > > > >> to
> > >> > > > > > > > > > > >>>>>>>> determine
> > >> > > > > > > > > > > >>>>>>>>>>> "isProcessingBacklog".
> > >> > > > > > > > > > > >>>>>>>>>>> From my point of view, there are 3
> > >> > limitations
> > >> > > of
> > >> > > > > the
> > >> > > > > > > > > current
> > >> > > > > > > > > > > >>>>>> proposal:
> > >> > > > > > > > > > > >>>>>>>>>>> 1. It doesn't cover jobs that don't
> have
> > >> > > > > > > > > watermark/event-time
> > >> > > > > > > > > > > >>>>>> defined,
> > >> > > > > > > > > > > >>>>>>>>>>> for example streaming join and
> unbounded
> > >> > > > aggregate.
> > >> > > > > > We
> > >> > > > > > > > may
> > >> > > > > > > > > > > >> still
> > >> > > > > > > > > > > >>>> need
> > >> > > > > > > > > > > >>>>>>>> to
> > >> > > > > > > > > > > >>>>>>>>>>> figure out solutions for them.
> > >> > > > > > > > > > > >>>>>>>>>>> 2. Watermark lag can not be trusted,
> > >> because
> > >> > it
> > >> > > > > > > increases
> > >> > > > > > > > > > > >>>> unlimited
> > >> > > > > > > > > > > >>>>>> if
> > >> > > > > > > > > > > >>>>>>>> no
> > >> > > > > > > > > > > >>>>>>>>>>> data is generated in the Kafka.
> > >> > > > > > > > > > > >>>>>>>>>>> But in this case, there is no backlog
> at
> > >> all.
> > >> > > > > > > > > > > >>>>>>>>>>> 3. Watermark lag is hard to reflect
> the
> > >> > amount
> > >> > > of
> > >> > > > > > > > backlog.
> > >> > > > > > > > > > > >> If the
> > >> > > > > > > > > > > >>>>>>>>>> watermark
> > >> > > > > > > > > > > >>>>>>>>>>> lag is 1day or 1 hour or 1second,
> > >> > > > > > > > > > > >>>>>>>>>>> there is possibly only 1 pending
> record
> > >> > there,
> > >> > > > > which
> > >> > > > > > > > means
> > >> > > > > > > > > no
> > >> > > > > > > > > > > >>>> backlog
> > >> > > > > > > > > > > >>>>>>>> at
> > >> > > > > > > > > > > >>>>>>>>>>> all.
> > >> > > > > > > > > > > >>>>>>>>>>>
> > >> > > > > > > > > > > >>>>>>>>>>> Therefore, IMO, watermark maybe not
> the
> > >> ideal
> > >> > > > > metric
> > >> > > > > > > used
> > >> > > > > > > > > to
> > >> > > > > > > > > > > >>>>>> determine
> > >> > > > > > > > > > > >>>>>>>>>>> "isProcessingBacklog".
> > >> > > > > > > > > > > >>>>>>>>>>> What we need is something that
> reflects
> > >> the
> > >> > > > number
> > >> > > > > of
> > >> > > > > > > > > records
> > >> > > > > > > > > > > >>>>>>>> unprocessed
> > >> > > > > > > > > > > >>>>>>>>>>> by the job.
> > >> > > > > > > > > > > >>>>>>>>>>> Actually, that is the "pendingRecords"
> > >> metric
> > >> > > > > > proposed
> > >> > > > > > > in
> > >> > > > > > > > > > > >>>> FLIP-33 and
> > >> > > > > > > > > > > >>>>>>>> has
> > >> > > > > > > > > > > >>>>>>>>>>> been implemented by Kafka source.
> > >> > > > > > > > > > > >>>>>>>>>>> Did you consider using
> "pendingRecords"
> > >> > metric
> > >> > > to
> > >> > > > > > > > determine
> > >> > > > > > > > > > > >>>>>>>>>>> "isProcessingBacklog"?
> > >> > > > > > > > > > > >>>>>>>>>>>
> > >> > > > > > > > > > > >>>>>>>>>>> Best,
> > >> > > > > > > > > > > >>>>>>>>>>> Jark
> > >> > > > > > > > > > > >>>>>>>>>>>
> > >> > > > > > > > > > > >>>>>>>>>>>
> > >> > > > > > > > > > > >>>>>>>>>>> [1]
> > >> > > > > > > > > > > >>>>>>>>>>>
> > >> > > > > > > > > > > >>>>>>>>>>
> > >> > > > > > > > > > > >>>>>>>>
> > >> > > > > > > > > > > >>>>>>
> > >> > > > > > > > > > > >>>>
> > >> > > > > > > > > > > >>
> > >> > > > > > > > > >
> > >> > > > > > > > >
> > >> > > > > > > >
> > >> > > > > > >
> > >> > > > > >
> > >> > > > >
> > >> > > >
> > >> > >
> > >> >
> > >>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics
> > >> > > > > > > > > > > >>>>>>>>>> <
> > >> > > > > > > > > > > >>>>>>>>>>
> > >> > > > > > > > > > > >>>>>>>>
> > >> > > > > > > > > > > >>>>>>
> > >> > > > > > > > > > > >>>>
> > >> > > > > > > > > > > >>
> > >> > > > > > > > > >
> > >> > > > > > > > >
> > >> > > > > > > >
> > >> > > > > > >
> > >> > > > > >
> > >> > > > >
> > >> > > >
> > >> > >
> > >> >
> > >>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics
> > >> > > > > > > > > > > >>>>>>>>>>>
> > >> > > > > > > > > > > >>>>>>>>>>>
> > >> > > > > > > > > > > >>>>>>>>>>>
> > >> > > > > > > > > > > >>>>>>>>>>>
> > >> > > > > > > > > > > >>>>>>>>>>> On Tue, 15 Aug 2023 at 12:04, Xintong
> > >> Song <
> > >> > > > > > > > > > > >>>> tonysong...@gmail.com
> > >> > > > > > > > > > > >>>>>>>>>> <mailto:tonysong...@gmail.com>> wrote:
> > >> > > > > > > > > > > >>>>>>>>>>>
> > >> > > > > > > > > > > >>>>>>>>>>>> Sounds good to me.
> > >> > > > > > > > > > > >>>>>>>>>>>>
> > >> > > > > > > > > > > >>>>>>>>>>>> It is true that, if we are
> introducing
> > >> the
> > >> > > > > > generalized
> > >> > > > > > > > > > > >>>> watermark,
> > >> > > > > > > > > > > >>>>>>>> there
> > >> > > > > > > > > > > >>>>>>>>>>>> will be other watermark related
> > concepts
> > >> /
> > >> > > > > > > > configurations
> > >> > > > > > > > > > > >> that
> > >> > > > > > > > > > > >>>> need
> > >> > > > > > > > > > > >>>>>> to
> > >> > > > > > > > > > > >>>>>>>>>> be
> > >> > > > > > > > > > > >>>>>>>>>>>> updated anyway.
> > >> > > > > > > > > > > >>>>>>>>>>>>
> > >> > > > > > > > > > > >>>>>>>>>>>>
> > >> > > > > > > > > > > >>>>>>>>>>>> Best,
> > >> > > > > > > > > > > >>>>>>>>>>>>
> > >> > > > > > > > > > > >>>>>>>>>>>> Xintong
> > >> > > > > > > > > > > >>>>>>>>>>>>
> > >> > > > > > > > > > > >>>>>>>>>>>>
> > >> > > > > > > > > > > >>>>>>>>>>>>
> > >> > > > > > > > > > > >>>>>>>>>>>> On Tue, Aug 15, 2023 at 11:30 AM
> > Xuannan
> > >> Su
> > >> > <
> > >> > > > > > > > > > > >>>> suxuanna...@gmail.com
> > >> > > > > > > > > > > >>>>>>>>>> <mailto:suxuanna...@gmail.com>> wrote:
> > >> > > > > > > > > > > >>>>>>>>>>>>
> > >> > > > > > > > > > > >>>>>>>>>>>>> Hi Xingtong,
> > >> > > > > > > > > > > >>>>>>>>>>>>>
> > >> > > > > > > > > > > >>>>>>>>>>>>> Thank you for your suggestion.
> > >> > > > > > > > > > > >>>>>>>>>>>>>
> > >> > > > > > > > > > > >>>>>>>>>>>>> After considering the idea of using
> a
> > >> > general
> > >> > > > > > > > > configuration
> > >> > > > > > > > > > > >>>> key, I
> > >> > > > > > > > > > > >>>>>>>>>> think
> > >> > > > > > > > > > > >>>>>>>>>>>>> it may not be a good idea for the
> > >> reasons
> > >> > > > below.
> > >> > > > > > > > > > > >>>>>>>>>>>>>
> > >> > > > > > > > > > > >>>>>>>>>>>>> While I agree that using a more
> > general
> > >> > > > > > configuration
> > >> > > > > > > > key
> > >> > > > > > > > > > > >>>> provides
> > >> > > > > > > > > > > >>>>>> us
> > >> > > > > > > > > > > >>>>>>>>>>>> with
> > >> > > > > > > > > > > >>>>>>>>>>>>> the flexibility to switch to other
> > >> > approaches
> > >> > > > to
> > >> > > > > > > > > calculate
> > >> > > > > > > > > > > >> the
> > >> > > > > > > > > > > >>>> lag
> > >> > > > > > > > > > > >>>>>> in
> > >> > > > > > > > > > > >>>>>>>>>> the
> > >> > > > > > > > > > > >>>>>>>>>>>>> future, the downside is that it may
> > >> cause
> > >> > > > > confusion
> > >> > > > > > > for
> > >> > > > > > > > > > > >> users.
> > >> > > > > > > > > > > >>>> We
> > >> > > > > > > > > > > >>>>>>>>>>>> currently
> > >> > > > > > > > > > > >>>>>>>>>>>>> have fetchEventTimeLag,
> > >> emitEventTimeLag,
> > >> > and
> > >> > > > > > > > > watermarkLag
> > >> > > > > > > > > > > >> in
> > >> > > > > > > > > > > >>>> the
> > >> > > > > > > > > > > >>>>>>>>>> source,
> > >> > > > > > > > > > > >>>>>>>>>>>>> and it is not clear which specific
> lag
> > >> we
> > >> > are
> > >> > > > > > > referring
> > >> > > > > > > > > to.
> > >> > > > > > > > > > > >>>> With
> > >> > > > > > > > > > > >>>>>> the
> > >> > > > > > > > > > > >>>>>>>>>>>>> potential introduction of the
> > >> Generalized
> > >> > > > > Watermark
> > >> > > > > > > > > > > >> mechanism
> > >> > > > > > > > > > > >>>> in
> > >> > > > > > > > > > > >>>>>> the
> > >> > > > > > > > > > > >>>>>>>>>>>>> future, if I understand correctly, a
> > >> > > watermark
> > >> > > > > > won't
> > >> > > > > > > > > > > >>>> necessarily
> > >> > > > > > > > > > > >>>>>> need
> > >> > > > > > > > > > > >>>>>>>>>> to
> > >> > > > > > > > > > > >>>>>>>>>>>> be
> > >> > > > > > > > > > > >>>>>>>>>>>>> a timestamp. I am concern that the
> > >> general
> > >> > > > > > > > configuration
> > >> > > > > > > > > > > >> key
> > >> > > > > > > > > > > >>>> may
> > >> > > > > > > > > > > >>>>>> not
> > >> > > > > > > > > > > >>>>>>>>>> be
> > >> > > > > > > > > > > >>>>>>>>>>>>> enough to cover all the use case and
> > we
> > >> > will
> > >> > > > need
> > >> > > > > > to
> > >> > > > > > > > > > > >> introduce
> > >> > > > > > > > > > > >>>> a
> > >> > > > > > > > > > > >>>>>>>>>> general
> > >> > > > > > > > > > > >>>>>>>>>>>>> way to determine the backlog status
> > >> > > regardless.
> > >> > > > > > > > > > > >>>>>>>>>>>>>
> > >> > > > > > > > > > > >>>>>>>>>>>>> For the reasons above, I prefer
> > >> introducing
> > >> > > the
> > >> > > > > > > > > > > >> configuration
> > >> > > > > > > > > > > >>>> as
> > >> > > > > > > > > > > >>>>>> is,
> > >> > > > > > > > > > > >>>>>>>>>> and
> > >> > > > > > > > > > > >>>>>>>>>>>>> change it later with the a
> deprecation
> > >> > > process
> > >> > > > or
> > >> > > > > > > > > migration
> > >> > > > > > > > > > > >>>>>> process.
> > >> > > > > > > > > > > >>>>>>>>>> What
> > >> > > > > > > > > > > >>>>>>>>>>>>> do you think?
> > >> > > > > > > > > > > >>>>>>>>>>>>>
> > >> > > > > > > > > > > >>>>>>>>>>>>> Best,
> > >> > > > > > > > > > > >>>>>>>>>>>>> Xuannan
> > >> > > > > > > > > > > >>>>>>>>>>>>> On Aug 14, 2023, 14:09 +0800,
> Xintong
> > >> Song
> > >> > <
> > >> > > > > > > > > > > >>>> tonysong...@gmail.com
> > >> > > > > > > > > > > >>>>>>>>>> <mailto:tonysong...@gmail.com>>,
> > >> > > > > > > > > > > >>>>>>>>>>>> wrote:
> > >> > > > > > > > > > > >>>>>>>>>>>>>> Thanks for the explanation.
> > >> > > > > > > > > > > >>>>>>>>>>>>>>
> > >> > > > > > > > > > > >>>>>>>>>>>>>> I wonder if it makes sense to not
> > >> expose
> > >> > > this
> > >> > > > > > detail
> > >> > > > > > > > via
> > >> > > > > > > > > > > >> the
> > >> > > > > > > > > > > >>>>>>>>>>>>> configuration
> > >> > > > > > > > > > > >>>>>>>>>>>>>> option. To be specific, I suggest
> not
> > >> > > > mentioning
> > >> > > > > > the
> > >> > > > > > > > > > > >>>> "watermark"
> > >> > > > > > > > > > > >>>>>>>>>>>> keyword
> > >> > > > > > > > > > > >>>>>>>>>>>>> in
> > >> > > > > > > > > > > >>>>>>>>>>>>>> the configuration key and
> > description.
> > >> > > > > > > > > > > >>>>>>>>>>>>>>
> > >> > > > > > > > > > > >>>>>>>>>>>>>> - From the users' perspective, I
> > think
> > >> > they
> > >> > > > only
> > >> > > > > > > need
> > >> > > > > > > > to
> > >> > > > > > > > > > > >> know
> > >> > > > > > > > > > > >>>>>>>> there's
> > >> > > > > > > > > > > >>>>>>>>>> a
> > >> > > > > > > > > > > >>>>>>>>>>>>>> lag higher than the given
> threshold,
> > >> Flink
> > >> > > > will
> > >> > > > > > > > consider
> > >> > > > > > > > > > > >>>> latency
> > >> > > > > > > > > > > >>>>>> of
> > >> > > > > > > > > > > >>>>>>>>>>>>>> individual records as less
> important
> > >> and
> > >> > > > > > prioritize
> > >> > > > > > > > > > > >> throughput
> > >> > > > > > > > > > > >>>>>> over
> > >> > > > > > > > > > > >>>>>>>>>> it.
> > >> > > > > > > > > > > >>>>>>>>>>>>>> They don't really need the details
> of
> > >> how
> > >> > > the
> > >> > > > > lags
> > >> > > > > > > are
> > >> > > > > > > > > > > >>>> calculated.
> > >> > > > > > > > > > > >>>>>>>>>>>>>> - For the internal implementation,
> I
> > >> also
> > >> > > > think
> > >> > > > > > > using
> > >> > > > > > > > > > > >>>> watermark
> > >> > > > > > > > > > > >>>>>> lags
> > >> > > > > > > > > > > >>>>>>>>>> is
> > >> > > > > > > > > > > >>>>>>>>>>>>>> a good idea, for the reasons you've
> > >> > already
> > >> > > > > > > mentioned.
> > >> > > > > > > > > > > >>>> However,
> > >> > > > > > > > > > > >>>>>> it's
> > >> > > > > > > > > > > >>>>>>>>>>>> not
> > >> > > > > > > > > > > >>>>>>>>>>>>>> the only possible option. Hiding
> this
> > >> > detail
> > >> > > > > from
> > >> > > > > > > > users
> > >> > > > > > > > > > > >> would
> > >> > > > > > > > > > > >>>> give
> > >> > > > > > > > > > > >>>>>>>> us
> > >> > > > > > > > > > > >>>>>>>>>>>> the
> > >> > > > > > > > > > > >>>>>>>>>>>>>> flexibility to switch to other
> > >> approaches
> > >> > if
> > >> > > > > > needed
> > >> > > > > > > in
> > >> > > > > > > > > > > >> future.
> > >> > > > > > > > > > > >>>>>>>>>>>>>> - We are currently working on
> > designing
> > >> > the
> > >> > > > > > > > > > > >> ProcessFunction
> > >> > > > > > > > > > > >>>> API
> > >> > > > > > > > > > > >>>>>>>>>>>>>> (consider it as a DataStream API
> V2).
> > >> > > There's
> > >> > > > an
> > >> > > > > > > idea
> > >> > > > > > > > to
> > >> > > > > > > > > > > >>>>>> introduce a
> > >> > > > > > > > > > > >>>>>>>>>>>>>> Generalized Watermark mechanism,
> > where
> > >> > > > basically
> > >> > > > > > the
> > >> > > > > > > > > > > >>>> watermark can
> > >> > > > > > > > > > > >>>>>>>> be
> > >> > > > > > > > > > > >>>>>>>>>>>>>> anything that needs to travel along
> > the
> > >> > > > > data-flow
> > >> > > > > > > with
> > >> > > > > > > > > > > >> certain
> > >> > > > > > > > > > > >>>>>>>>>>>> alignment
> > >> > > > > > > > > > > >>>>>>>>>>>>>> strategies, and event time
> watermark
> > >> would
> > >> > > be
> > >> > > > > one
> > >> > > > > > > > > specific
> > >> > > > > > > > > > > >>>> case of
> > >> > > > > > > > > > > >>>>>>>> it.
> > >> > > > > > > > > > > >>>>>>>>>>>>> This
> > >> > > > > > > > > > > >>>>>>>>>>>>>> is still an idea and has not been
> > >> > discussed
> > >> > > > and
> > >> > > > > > > agreed
> > >> > > > > > > > > on
> > >> > > > > > > > > > > >> by
> > >> > > > > > > > > > > >>>> the
> > >> > > > > > > > > > > >>>>>>>>>>>>> community,
> > >> > > > > > > > > > > >>>>>>>>>>>>>> and we are preparing a FLIP for it.
> > >> But if
> > >> > > we
> > >> > > > > are
> > >> > > > > > > > going
> > >> > > > > > > > > > > >> for
> > >> > > > > > > > > > > >>>> it,
> > >> > > > > > > > > > > >>>>>> the
> > >> > > > > > > > > > > >>>>>>>>>>>>> concept
> > >> > > > > > > > > > > >>>>>>>>>>>>>> "watermark-lag-threshold" could be
> > >> > > ambiguous.
> > >> > > > > > > > > > > >>>>>>>>>>>>>>
> > >> > > > > > > > > > > >>>>>>>>>>>>>> I do not intend to block the FLIP
> on
> > >> this.
> > >> > > I'd
> > >> > > > > > also
> > >> > > > > > > be
> > >> > > > > > > > > > > >> fine
> > >> > > > > > > > > > > >>>> with
> > >> > > > > > > > > > > >>>>>>>>>>>>>> introducing the configuration as
> is,
> > >> and
> > >> > > > > changing
> > >> > > > > > it
> > >> > > > > > > > > > > >> later, if
> > >> > > > > > > > > > > >>>>>>>> needed,
> > >> > > > > > > > > > > >>>>>>>>>>>>> with
> > >> > > > > > > > > > > >>>>>>>>>>>>>> a regular deprecation and migration
> > >> > process.
> > >> > > > > Just
> > >> > > > > > > > making
> > >> > > > > > > > > > > >> my
> > >> > > > > > > > > > > >>>>>>>>>>>> suggestions.
> > >> > > > > > > > > > > >>>>>>>>>>>>>>
> > >> > > > > > > > > > > >>>>>>>>>>>>>>
> > >> > > > > > > > > > > >>>>>>>>>>>>>> Best,
> > >> > > > > > > > > > > >>>>>>>>>>>>>>
> > >> > > > > > > > > > > >>>>>>>>>>>>>> Xintong
> > >> > > > > > > > > > > >>>>>>>>>>>>>>
> > >> > > > > > > > > > > >>>>>>>>>>>>>>
> > >> > > > > > > > > > > >>>>>>>>>>>>>>
> > >> > > > > > > > > > > >>>>>>>>>>>>>> On Mon, Aug 14, 2023 at 12:00 PM
> > >> Xuannan
> > >> > Su
> > >> > > <
> > >> > > > > > > > > > > >>>>>> suxuanna...@gmail.com
> > >> > > > > > > > > > > >>>>>>>>>> <mailto:suxuanna...@gmail.com>>
> > >> > > > > > > > > > > >>>>>>>>>>>>> wrote:
> > >> > > > > > > > > > > >>>>>>>>>>>>>>
> > >> > > > > > > > > > > >>>>>>>>>>>>>>> Hi Xintong,
> > >> > > > > > > > > > > >>>>>>>>>>>>>>>
> > >> > > > > > > > > > > >>>>>>>>>>>>>>> Thanks for the reply.
> > >> > > > > > > > > > > >>>>>>>>>>>>>>>
> > >> > > > > > > > > > > >>>>>>>>>>>>>>> I have considered using the
> > timestamp
> > >> in
> > >> > > the
> > >> > > > > > > records
> > >> > > > > > > > to
> > >> > > > > > > > > > > >>>> determine
> > >> > > > > > > > > > > >>>>>>>> the
> > >> > > > > > > > > > > >>>>>>>>>>>>>>> backlog status, and decided to use
> > >> > > watermark
> > >> > > > at
> > >> > > > > > the
> > >> > > > > > > > > end.
> > >> > > > > > > > > > > >> By
> > >> > > > > > > > > > > >>>>>>>>>>>> definition,
> > >> > > > > > > > > > > >>>>>>>>>>>>>>> watermark is the time progress
> > >> indication
> > >> > > in
> > >> > > > > the
> > >> > > > > > > data
> > >> > > > > > > > > > > >>>> stream. It
> > >> > > > > > > > > > > >>>>>>>>>>>>> indicates
> > >> > > > > > > > > > > >>>>>>>>>>>>>>> the stream’s event time has
> > >> progressed to
> > >> > > > some
> > >> > > > > > > > specific
> > >> > > > > > > > > > > >>>> time. On
> > >> > > > > > > > > > > >>>>>>>> the
> > >> > > > > > > > > > > >>>>>>>>>>>>> other
> > >> > > > > > > > > > > >>>>>>>>>>>>>>> hand, timestamp in the records is
> > >> usually
> > >> > > > used
> > >> > > > > to
> > >> > > > > > > > > > > >> generate
> > >> > > > > > > > > > > >>>> the
> > >> > > > > > > > > > > >>>>>>>>>>>>> watermark.
> > >> > > > > > > > > > > >>>>>>>>>>>>>>> Therefore, it appears more
> > appropriate
> > >> > and
> > >> > > > > > > intuitive
> > >> > > > > > > > to
> > >> > > > > > > > > > > >>>> calculate
> > >> > > > > > > > > > > >>>>>>>> the
> > >> > > > > > > > > > > >>>>>>>>>>>>> event
> > >> > > > > > > > > > > >>>>>>>>>>>>>>> time lag by watermark and
> determine
> > >> the
> > >> > > > backlog
> > >> > > > > > > > status.
> > >> > > > > > > > > > > >> And
> > >> > > > > > > > > > > >>>> by
> > >> > > > > > > > > > > >>>>>>>> using
> > >> > > > > > > > > > > >>>>>>>>>>>>> the
> > >> > > > > > > > > > > >>>>>>>>>>>>>>> watermark, we can easily deal with
> > the
> > >> > > > > > out-of-order
> > >> > > > > > > > and
> > >> > > > > > > > > > > >> the
> > >> > > > > > > > > > > >>>>>>>> idleness
> > >> > > > > > > > > > > >>>>>>>>>>>>> of the
> > >> > > > > > > > > > > >>>>>>>>>>>>>>> data.
> > >> > > > > > > > > > > >>>>>>>>>>>>>>>
> > >> > > > > > > > > > > >>>>>>>>>>>>>>> Please let me know if you have
> > further
> > >> > > > > questions.
> > >> > > > > > > > > > > >>>>>>>>>>>>>>>
> > >> > > > > > > > > > > >>>>>>>>>>>>>>> Best,
> > >> > > > > > > > > > > >>>>>>>>>>>>>>> Xuannan
> > >> > > > > > > > > > > >>>>>>>>>>>>>>> On Aug 10, 2023, 20:23 +0800,
> > Xintong
> > >> > Song
> > >> > > <
> > >> > > > > > > > > > > >>>>>> tonysong...@gmail.com
> > >> > > > > > > > > > > >>>>>>>>>> <mailto:tonysong...@gmail.com>>,
> > >> > > > > > > > > > > >>>>>>>>>>>>> wrote:
> > >> > > > > > > > > > > >>>>>>>>>>>>>>>> Thanks for preparing the FLIP,
> > >> Xuannan.
> > >> > > > > > > > > > > >>>>>>>>>>>>>>>>
> > >> > > > > > > > > > > >>>>>>>>>>>>>>>> +1 in general.
> > >> > > > > > > > > > > >>>>>>>>>>>>>>>>
> > >> > > > > > > > > > > >>>>>>>>>>>>>>>> A quick question, could you
> explain
> > >> why
> > >> > we
> > >> > > > are
> > >> > > > > > > > relying
> > >> > > > > > > > > > > >> on
> > >> > > > > > > > > > > >>>> the
> > >> > > > > > > > > > > >>>>>>>>>>>>> watermark
> > >> > > > > > > > > > > >>>>>>>>>>>>>>> for
> > >> > > > > > > > > > > >>>>>>>>>>>>>>>> emitting the record attribute?
> Why
> > >> not
> > >> > use
> > >> > > > > > > > timestamps
> > >> > > > > > > > > > > >> in the
> > >> > > > > > > > > > > >>>>>>>>>>>>> records? I
> > >> > > > > > > > > > > >>>>>>>>>>>>>>>> don't see any concern in using
> > >> > watermarks.
> > >> > > > > Just
> > >> > > > > > > > > > > >> wondering if
> > >> > > > > > > > > > > >>>>>>>>>>>> there's
> > >> > > > > > > > > > > >>>>>>>>>>>>> any
> > >> > > > > > > > > > > >>>>>>>>>>>>>>>> deep considerations behind this.
> > >> > > > > > > > > > > >>>>>>>>>>>>>>>>
> > >> > > > > > > > > > > >>>>>>>>>>>>>>>> Best,
> > >> > > > > > > > > > > >>>>>>>>>>>>>>>>
> > >> > > > > > > > > > > >>>>>>>>>>>>>>>> Xintong
> > >> > > > > > > > > > > >>>>>>>>>>>>>>>>
> > >> > > > > > > > > > > >>>>>>>>>>>>>>>>
> > >> > > > > > > > > > > >>>>>>>>>>>>>>>>
> > >> > > > > > > > > > > >>>>>>>>>>>>>>>> On Thu, Aug 3, 2023 at 3:03 PM
> > >> Xuannan
> > >> > Su
> > >> > > <
> > >> > > > > > > > > > > >>>>>> suxuanna...@gmail.com
> > >> > > > > > > > > > > >>>>>>>>>> <mailto:suxuanna...@gmail.com>>
> > >> > > > > > > > > > > >>>>>>>>>>>>> wrote:
> > >> > > > > > > > > > > >>>>>>>>>>>>>>>>
> > >> > > > > > > > > > > >>>>>>>>>>>>>>>>> Hi all,
> > >> > > > > > > > > > > >>>>>>>>>>>>>>>>>
> > >> > > > > > > > > > > >>>>>>>>>>>>>>>>> I am opening this thread to
> > discuss
> > >> > > > FLIP-328:
> > >> > > > > > > Allow
> > >> > > > > > > > > > > >> source
> > >> > > > > > > > > > > >>>>>>>>>>>>> operators to
> > >> > > > > > > > > > > >>>>>>>>>>>>>>>>> determine isProcessingBacklog
> > based
> > >> on
> > >> > > > > > watermark
> > >> > > > > > > > > > > >> lag[1]. We
> > >> > > > > > > > > > > >>>>>> had a
> > >> > > > > > > > > > > >>>>>>>>>>>>>>> several
> > >> > > > > > > > > > > >>>>>>>>>>>>>>>>> discussions with Dong Ling about
> > the
> > >> > > > design,
> > >> > > > > > and
> > >> > > > > > > > > thanks
> > >> > > > > > > > > > > >>>> for all
> > >> > > > > > > > > > > >>>>>>>>>>>> the
> > >> > > > > > > > > > > >>>>>>>>>>>>>>>>> valuable advice.
> > >> > > > > > > > > > > >>>>>>>>>>>>>>>>>
> > >> > > > > > > > > > > >>>>>>>>>>>>>>>>> The FLIP aims to target the
> > use-case
> > >> > > where
> > >> > > > > user
> > >> > > > > > > > want
> > >> > > > > > > > > to
> > >> > > > > > > > > > > >>>> run a
> > >> > > > > > > > > > > >>>>>>>>>>>> Flink
> > >> > > > > > > > > > > >>>>>>>>>>>>>>> job to
> > >> > > > > > > > > > > >>>>>>>>>>>>>>>>> backfill historical data in a
> high
> > >> > > > throughput
> > >> > > > > > > > manner
> > >> > > > > > > > > > > >> and
> > >> > > > > > > > > > > >>>>>> continue
> > >> > > > > > > > > > > >>>>>>>>>>>>>>>>> processing real-time data with
> low
> > >> > > latency.
> > >> > > > > > > > Building
> > >> > > > > > > > > > > >> upon
> > >> > > > > > > > > > > >>>> the
> > >> > > > > > > > > > > >>>>>>>>>>>>> backlog
> > >> > > > > > > > > > > >>>>>>>>>>>>>>>>> concept introduced in
> FLIP-309[2],
> > >> this
> > >> > > > > > proposal
> > >> > > > > > > > > > > >> enables
> > >> > > > > > > > > > > >>>>>> sources
> > >> > > > > > > > > > > >>>>>>>>>>>> to
> > >> > > > > > > > > > > >>>>>>>>>>>>>>> report
> > >> > > > > > > > > > > >>>>>>>>>>>>>>>>> their status of processing
> backlog
> > >> > based
> > >> > > on
> > >> > > > > the
> > >> > > > > > > > > > > >> watermark
> > >> > > > > > > > > > > >>>> lag.
> > >> > > > > > > > > > > >>>>>>>>>>>>>>>>>
> > >> > > > > > > > > > > >>>>>>>>>>>>>>>>> We would greatly appreciate any
> > >> > comments
> > >> > > or
> > >> > > > > > > > feedback
> > >> > > > > > > > > > > >> you
> > >> > > > > > > > > > > >>>> may
> > >> > > > > > > > > > > >>>>>> have
> > >> > > > > > > > > > > >>>>>>>>>>>>> on
> > >> > > > > > > > > > > >>>>>>>>>>>>>>> this
> > >> > > > > > > > > > > >>>>>>>>>>>>>>>>> proposal.
> > >> > > > > > > > > > > >>>>>>>>>>>>>>>>>
> > >> > > > > > > > > > > >>>>>>>>>>>>>>>>> Best,
> > >> > > > > > > > > > > >>>>>>>>>>>>>>>>> Xuannan
> > >> > > > > > > > > > > >>>>>>>>>>>>>>>>>
> > >> > > > > > > > > > > >>>>>>>>>>>>>>>>>
> > >> > > > > > > > > > > >>>>>>>>>>>>>>>>> [1]
> > >> > > > > > > > > > > >>>>>>>>>>>>>>>>>
> > >> > > > > > > > > > > >>>>>>>>>>>>>>>
> > >> > > > > > > > > > > >>>>>>>>>>>>>
> > >> > > > > > > > > > > >>>>>>>>>>>>
> > >> > > > > > > > > > > >>>>>>>>>>
> > >> > > > > > > > > > > >>>>>>>>
> > >> > > > > > > > > > > >>>>>>
> > >> > > > > > > > > > > >>>>
> > >> > > > > > > > > > > >>
> > >> > > > > > > > > >
> > >> > > > > > > > >
> > >> > > > > > > >
> > >> > > > > > >
> > >> > > > > >
> > >> > > > >
> > >> > > >
> > >> > >
> > >> >
> > >>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-328%3A+Allow+source+operators+to+determine+isProcessingBacklog+based+on+watermark+lag
> > >> > > > > > > > > > > >>>>>>>>>> <
> > >> > > > > > > > > > > >>>>>>>>>>
> > >> > > > > > > > > > > >>>>>>>>
> > >> > > > > > > > > > > >>>>>>
> > >> > > > > > > > > > > >>>>
> > >> > > > > > > > > > > >>
> > >> > > > > > > > > >
> > >> > > > > > > > >
> > >> > > > > > > >
> > >> > > > > > >
> > >> > > > > >
> > >> > > > >
> > >> > > >
> > >> > >
> > >> >
> > >>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-328%3A+Allow+source+operators+to+determine+isProcessingBacklog+based+on+watermark+lag
> > >> > > > > > > > > > > >>>>>>>>>>>
> > >> > > > > > > > > > > >>>>>>>>>>>>>>>>> [2]
> > >> > > > > > > > > > > >>>>>>>>>>>>>>>>>
> > >> > > > > > > > > > > >>>>>>>>>>>>>>>
> > >> > > > > > > > > > > >>>>>>>>>>>>>
> > >> > > > > > > > > > > >>>>>>>>>>>>
> > >> > > > > > > > > > > >>>>>>>>>>
> > >> > > > > > > > > > > >>>>>>>>
> > >> > > > > > > > > > > >>>>>>
> > >> > > > > > > > > > > >>>>
> > >> > > > > > > > > > > >>
> > >> > > > > > > > > >
> > >> > > > > > > > >
> > >> > > > > > > >
> > >> > > > > > >
> > >> > > > > >
> > >> > > > >
> > >> > > >
> > >> > >
> > >> >
> > >>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-309%3A+Support+using+larger+checkpointing+interval+when+source+is+processing+backlog
> > >> > > > > > > > > > > >>>>>>>>>> <
> > >> > > > > > > > > > > >>>>>>>>>>
> > >> > > > > > > > > > > >>>>>>>>
> > >> > > > > > > > > > > >>>>>>
> > >> > > > > > > > > > > >>>>
> > >> > > > > > > > > > > >>
> > >> > > > > > > > > >
> > >> > > > > > > > >
> > >> > > > > > > >
> > >> > > > > > >
> > >> > > > > >
> > >> > > > >
> > >> > > >
> > >> > >
> > >> >
> > >>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-309%3A+Support+using+larger+checkpointing+interval+when+source+is+processing+backlog
> > >> > > > > > > > > > > >>>>>>>>
> > >> > > > > > > > > > > >>>>>>>>
> > >> > > > > > > > > > > >>>>>>>>
> > >> > > > > > > > > > > >>>>>>
> > >> > > > > > > > > > > >>>>>>
> > >> > > > > > > > > > > >>>>
> > >> > > > > > > > > > > >>
> > >> > > > > > > > > > >
> > >> > > > > > > > > >
> > >> > > > > > > > >
> > >> > > > > > > >
> > >> > > > > > >
> > >> > > > > >
> > >> > > > >
> > >> > > >
> > >> > >
> > >> >
> > >>
> > >
> >
>

Reply via email to