Hi all,

Jark and I discussed this FLIP offline and I will summarize our discussion
below. It would be great if you could provide your opinion of the proposed
options.

Regarding the target use-cases:
- We both agreed that MySQL CDC should have backlog=true when watermarkLag
is large during the binlog phase.
- Dong argued that other streaming sources with watermarkLag defined (e.g.
Kafka) should also have backlog=true when watermarkLag is large. The
pros/cons discussion below assume this use-case needs to be supported.

The 1st option is what is currently proposed in FLIP-328, with the
following key characteristics:
1) There is one job-level config (i.e.
pipeline.backlog.watermark-lag-threshold) that applies to all sources with
watermarkLag metric defined.
2) The semantics of `#setIsProcessingBacklog(false)` is that it overrides
the effect of the previous invocation (if any) of
`#setIsProcessingBacklog(true)` on the given source instance.

The 2nd option is what Jark proposed in this email thread, with the
following key characteristics:
1) Add source-specific config (both Java API and SQL source property) to
every source for which we want to set backlog status based on the
watermarkLag metric. For example, we might add separate Java APIs
`#setWatermarkLagThreshold`  for MySQL CDC source, HybridSource,
KafkaSource, PulsarSource etc.
2) The semantics of `#setIsProcessingBacklog(false)` is that the given
source instance will have watermarkLag=false.

Here are the key pros/cons of these two options.

Cons of the 1st option:
1) The semantics of `#setIsProcessingBacklog(false)` is harder to
understand for Flink operator developers than the corresponding semantics
in option-2.

Cons of the  2nd option:
1) More work for end-users. For a job with multiple sources that need to be
configured with a watermark lag threshold, users need to specify multiple
configs (one for each source) instead of specifying one job-level config.

2) More work for Flink operator developers. Overall there are more public
APIs (one Java API and one SQL property for each source that needs to
determine backlog based on watermark) exposed to end users. This also adds
more burden for the Flink community to maintain these APIs.

3) It would be hard (e.g. require backward incompatible API change) to
extend the Flink runtime to support job-level config to set watermark
strategy in the future (e.g. support the
pipeline.backlog.watermark-lag-threshold in option-1). This is because an
existing source operator's code might have hardcoded an invocation of
`#setIsProcessingBacklog(false)`, which means the backlog status must be
set to true, which prevents Flink runtime from setting backlog=true when a
new strategy is triggered.

Overall, I am still inclined to choose option-1 because it is more
extensible and simpler to use in the long term when we want to support/use
multiple sources whose backlog status can change based on the watermark
lag. While option-1's `#setIsProcessingBacklog` is a bit harder to
understand than option-2, I think this overhead/cost is worthwhile as it
makes end-users' life easier in the long term.

Jark: thank you for taking the time to review this FLIP. Please feel free
to comment if I missed anything in the pros/cons above.

Jark and I have not reached agreement on which option is better. It will be
really helpful if we can get more comments on these options.

Thanks,
Dong


On Tue, Sep 19, 2023 at 11:26 AM Dong Lin <lindon...@gmail.com> wrote:

> Hi Jark,
>
> Thanks for the reply. Please see my comments inline.
>
> On Tue, Sep 19, 2023 at 10:12 AM Jark Wu <imj...@gmail.com> wrote:
>
>> Hi Dong,
>>
>> Sorry for the late reply.
>>
>> > The rationale is that if there is any strategy that is triggered and
>> says
>> > backlog=true, then job's backlog should be true. Otherwise, the job's
>> > backlog status is false.
>>
>> I'm quite confused about this. Does that mean, if the source is in the
>> changelog phase, the source has to continuously invoke
>> "setIsProcessingBacklog(true)" (in an infinite loop?). Otherwise,
>> the job's backlog status would be set to false by the framework?
>>
>
> No, the source would not have to continuously invoke
> setIsProcessingBacklog(true) in an infinite loop.
>
> Actually, I am not very sure why there is confusion that "the job's
> backlog status would be set to false by the framework". Could you explain
> where that comes from?
>
> I guess it might be useful to provide a complete overview of how
> setIsProcessingBacklog(...)
> and pipline.backlog.watermark-lag-threshold work together to determine the
> overall job's backlog status. Let me explain it below.
>
> Here is the semantics/behavior of setIsProcessingBacklog(..).
> - This method is invoked on a per-source basis.
> - This method can be invoked multiple times with true/false as its input
> parameter.
> - For a given source, the last invocation of this method overwrites the
> effect of earlier invocation of this method.
> - For a given source, if this method has been invoked at least once and
> the last invocation of this method has isProcessingBacklog = true, then it
> is guaranteed that the backlog status of this source is set to true.
>
> Here is the semantics/behavior of pipline.backlog.watermark-lag-threshold:
> - This config is specified at the job level and applies to every source
> included in this job.
> - For a given source, if it's watermarkLag metric is available (see
> FLIP-33) and watermarkLag > watermark-lag-threshold, then it is guaranteed
> that backlog status of this source is set to true.
>
> Here is how the source's backlog status is determined: If a rule specified
> above says the source's backog status should be true, then the source's
> backlog status is set to true. Otherwise, it is set to false.
>
> How is how the job's backlog status is determined: If there exists a
> source that is currently running and the source's backlog status is set to
> true, then the job's backlog status is set to true. Otherwise, it is set to
> false.
>
> Hopefully this can help clarify the behavior. If needed, we can update the
> relevant doc (e.g. setIsProcessingBacklog() Java doc) to make semantics
> above clearer for Flink users.
>
> And I would be happy to discuss/explain this design offline when you have
> time.
>
> Thanks,
> Dong
>
>
>>
>> Best,
>> Jark
>>
>> On Tue, 19 Sept 2023 at 09:13, Dong Lin <lindon...@gmail.com> wrote:
>>
>> > Hi Jark,
>> >
>> > Do you have time to comment on whether the current design looks good?
>> >
>> > I plan to start voting in 3 days if there is no follow-up comment.
>> >
>> > Thanks,
>> > Dong
>> >
>> >
>> > On Fri, Sep 15, 2023 at 2:01 PM Jark Wu <imj...@gmail.com> wrote:
>> >
>> > > Hi Dong,
>> > >
>> > > > Note that we can not simply enforce the semantics of "any
>> invocation of
>> > > > setIsProcessingBacklog(false) will set the job's backlog status to
>> > > false".
>> > > > Suppose we have a job with two operators, where operatorA invokes
>> > > > setIsProcessingBacklog(false) and operatorB invokes
>> > > > setIsProcessingBacklog(true). There will be conflict if we use the
>> > > > semantics of "any invocation of setIsProcessingBacklog(false) will
>> set
>> > > the
>> > > > job's backlog status to false".
>> > >
>> > > So it should set job's backlog status to false if the job has only a
>> > single
>> > > source,
>> > > right?
>> > >
>> > > Could you elaborate on the behavior if there is a job with a single
>> > source,
>> > > and the watermark lag exceeds the configured value (should set
>> backlog to
>> > > true?),
>> > > but the source invokes "setIsProcessingBacklog(false)"? Or the inverse
>> > one,
>> > > the source invokes "setIsProcessingBacklog(false)" first, but the
>> > watermark
>> > > lag
>> > > exceeds the configured value.
>> > >
>> > > This is the conflict I'm concerned about.
>> > >
>> > > Best,
>> > > Jark
>> > >
>> > > On Fri, 15 Sept 2023 at 12:00, Dong Lin <lindon...@gmail.com> wrote:
>> > >
>> > > > Hi Jark,
>> > > >
>> > > > Please see my comments inline.
>> > > >
>> > > > On Fri, Sep 15, 2023 at 10:35 AM Jark Wu <imj...@gmail.com> wrote:
>> > > >
>> > > > > Hi Dong,
>> > > > >
>> > > > > Please see my comments inline below.
>> > > >
>> > > >
>> > > > > > Hmm.. can you explain what you mean by "different watermark
>> delay
>> > > > > > definitions for each source"?
>> > > > >
>> > > > > For example, "table1" defines a watermark with delay 5 seconds,
>> > > > > "table2" defines a watermark with delay 10 seconds. They have
>> > different
>> > > > > watermark delay definitions. So it is also reasonable they have
>> > > different
>> > > > > watermark lag definitions, e.g., "table1" allows "10mins" and
>> > "table2"
>> > > > > allows "20mins".
>> > > > >
>> > > >
>> > > > I think the watermark delay you mentioned above is conceptually /
>> > > > fundamentally different from the watermark-lag-threshold proposed in
>> > this
>> > > > FLIP.
>> > > >
>> > > > It might be useful to revisit the semantics of these two concepts:
>> > > > - watermark delay is used to account for the maximum amount of
>> > > orderliness
>> > > > that users expect (or willing to wait for) for records from a given
>> > > source.
>> > > > - watermark-lag-threshold is used to define when processing latency
>> is
>> > no
>> > > > longer important (e.g. because data is already stale).
>> > > >
>> > > > Even though users might expect different out of orderliness for
>> > different
>> > > > sources, users do not necessarily have different definitions /
>> > thresholds
>> > > > for when a record is considered "already stale".
>> > > >
>> > > >
>> > > > >
>> > > > > > I think there is probably misunderstanding here. FLIP-309 does
>> NOT
>> > > > > directly
>> > > > > > specify when backlog is false. It is intentionally specified in
>> > such
>> > > a
>> > > > > way
>> > > > > > that there will  not be any conflict between these rules.
>> > > > >
>> > > > > Do you mean FLIP-309 doesn't allow to specify backlog to be false?
>> > > > > Is this mentioned in FLIP-309? This is completely different from
>> > what I
>> > > > >
>> > > >
>> > > > Can you explain what you mean by "allow to specify backlog to be
>> > false"?
>> > > >
>> > > > If what you mean is that "can invoke setIsProcessingBacklog(false)",
>> > then
>> > > > FLIP-309 supports doing this.
>> > > >
>> > > > If what you mean is that "any invocation of
>> > setIsProcessingBacklog(false)
>> > > > will set the job's backlog status to false", then FLIP-309 does not
>> > > support
>> > > > this. I believe the existing Java doc of this API and FLIP-309 is
>> > > > compatible with this explanation.
>> > > >
>> > > > Note that we can not simply enforce the semantics of "any
>> invocation of
>> > > > setIsProcessingBacklog(false) will set the job's backlog status to
>> > > false".
>> > > > Suppose we have a job with two operators, where operatorA invokes
>> > > > setIsProcessingBacklog(false) and operatorB invokes
>> > > > setIsProcessingBacklog(true). There will be conflict if we use the
>> > > > semantics of "any invocation of setIsProcessingBacklog(false) will
>> set
>> > > the
>> > > > job's backlog status to false".
>> > > >
>> > > > Would this answer your question?
>> > > >
>> > > > Best,
>> > > > Dong
>> > > >
>> > > >
>> > > > > understand. From the API interface
>> > > "ctx.setIsProcessingBacklog(boolean)",
>> > > > > it allows users to invoke "setIsProcessingBacklog(false)". And
>> > FLIP-309
>> > > > > also says "MySQL CDC source should report
>> isProcessingBacklog=false
>> > > > > at the beginning of the changelog stage." If not, maybe we need to
>> > > > revisit
>> > > > > FLIP-309.
>> > > >
>> > > >
>> > > > > Best,
>> > > > > Jark
>> > > > >
>> > > > >
>> > > > >
>> > > > > On Fri, 15 Sept 2023 at 08:41, Dong Lin <lindon...@gmail.com>
>> wrote:
>> > > > >
>> > > > > > Hi Jark,
>> > > > > >
>> > > > > > Do you have any follow-up comment?
>> > > > > >
>> > > > > > My gut feeling is that suppose we need to support per-source
>> > > watermark
>> > > > > lag
>> > > > > > specification in the future (not sure we have a use-case for
>> this
>> > > right
>> > > > > > now), we can add such a config in the future with a follow-up
>> FLIP.
>> > > The
>> > > > > > job-level config will still be useful as it makes users'
>> > > configuration
>> > > > > > simpler for common scenarios.
>> > > > > >
>> > > > > > If it is OK, can we agree to make incremental progress for Flink
>> > and
>> > > > > start
>> > > > > > a voting thread for this FLIP?
>> > > > > >
>> > > > > > Thanks,
>> > > > > > Dong
>> > > > > >
>> > > > > >
>> > > > > > On Mon, Sep 11, 2023 at 4:41 PM Jark Wu <imj...@gmail.com>
>> wrote:
>> > > > > >
>> > > > > > > Hi Dong,
>> > > > > > >
>> > > > > > > Please see my comments inline.
>> > > > > > >
>> > > > > > > >  As a result, the proposed job-level
>> > > > > > > > config will be applied only in the changelog stage. So
>> there is
>> > > no
>> > > > > > > > difference between these two approaches in this particular
>> > case,
>> > > > > right?
>> > > > > > >
>> > > > > > > How the job-level config can be applied ONLY in the changelog
>> > > stage?
>> > > > > > > I think it is only possible if it is implemented by the CDC
>> > source
>> > > > > > itself,
>> > > > > > > because the framework doesn't know which stage of the source
>> is.
>> > > > > > > Know that the CDC source may emit watermarks with a very small
>> > lag
>> > > > > > > in the snapshot stage, and the job-level config may turn the
>> > > backlog
>> > > > > > > status into false.
>> > > > > > >
>> > > > > > > > On the other hand, per-source config will be necessary if
>> users
>> > > > want
>> > > > > to
>> > > > > > > > apply different watermark lag thresholds for different
>> sources
>> > in
>> > > > the
>> > > > > > > same
>> > > > > > > > job.
>> > > > > > >
>> > > > > > > We also have different watermark delay definitions for each
>> > source,
>> > > > > > > I think this's also reasonable and necessary to have different
>> > > > > watermark
>> > > > > > > lags.
>> > > > > > >
>> > > > > > >
>> > > > > > > > Each source can have its own rule that specifies when the
>> > backlog
>> > > > can
>> > > > > > be
>> > > > > > > true
>> > > > > > > > (e.g. MySql CDC says the backlog should be true during the
>> > > snapshot
>> > > > > > > stage).
>> > > > > > > > And we can have a job-level config that specifies when the
>> > > backlog
>> > > > > > should
>> > > > > > > > be true. Note that it is designed in such a way that none of
>> > > these
>> > > > > > rules
>> > > > > > > > specify when the backlog should be false. That is why there
>> is
>> > no
>> > > > > > > conflict
>> > > > > > > > by definition.
>> > > > > > >
>> > > > > > > IIUC, FLIP-309 provides `setIsProcessingBacklog` to specify
>> when
>> > > the
>> > > > > > > backlog
>> > > > > > > is true and when is FALSE. This conflicts with the job-level
>> > config
>> > > > as
>> > > > > it
>> > > > > > > will turn
>> > > > > > > the status into true.
>> > > > > > >
>> > > > > > > > If I understand your comments correctly, you mean that we
>> might
>> > > > have
>> > > > > a
>> > > > > > > > Flink SQL DDL with user-defined watermark expressions. And
>> > users
>> > > > also
>> > > > > > > want
>> > > > > > > > to set the backlog to true if the watermark generated by
>> that
>> > > > > > > > user-specified expression exceeds a threshold.
>> > > > > > >
>> > > > > > > No. I mean the source may not support generating watermarks,
>> so
>> > the
>> > > > > > > watermark
>> > > > > > > expression is applied in a following operator (instead of in
>> the
>> > > > source
>> > > > > > > operator).
>> > > > > > > This will result in the watermark lag doesn't work in this
>> case
>> > and
>> > > > > > confuse
>> > > > > > > users.
>> > > > > > >
>> > > > > > > > You are right that this is a limitation. However, this is
>> only
>> > a
>> > > > > > > short-term
>> > > > > > > > limitation which we added to make sure that we can focus on
>> the
>> > > > > > > capability
>> > > > > > > > to switch from backlog=true to backlog=false. In the
>> future, we
>> > > > will
>> > > > > > > remove
>> > > > > > > > this limitation and also support switching from
>> backlog=false
>> > to
>> > > > > > > > backlog=true.
>> > > > > > >
>> > > > > > > I can understand it may be difficult to support runtime mode
>> > > > switching
>> > > > > > back
>> > > > > > > and forth.
>> > > > > > > However, I think this should be a limitation of FLIP-327, not
>> > > > FLIP-328.
>> > > > > > > IIUC,
>> > > > > > > FLIP-309 doesn't have this limitation, right? I just don't
>> > > understand
>> > > > > > > what's the
>> > > > > > > challenge to switch a flag?
>> > > > > > >
>> > > > > > > Best,
>> > > > > > > Jark
>> > > > > > >
>> > > > > > >
>> > > > > > > On Sun, 10 Sept 2023 at 19:44, Dong Lin <lindon...@gmail.com>
>> > > wrote:
>> > > > > > >
>> > > > > > > > Hi Jark,
>> > > > > > > >
>> > > > > > > > Thanks for the comments. Please see my comments inline.
>> > > > > > > >
>> > > > > > > > On Sat, Sep 9, 2023 at 4:13 PM Jark Wu <imj...@gmail.com>
>> > wrote:
>> > > > > > > >
>> > > > > > > > > Hi Xuannan,
>> > > > > > > > >
>> > > > > > > > > I leave my comments inline.
>> > > > > > > > >
>> > > > > > > > > > In the case where a user wants to
>> > > > > > > > > > use a CDC source and also determine backlog status
>> based on
>> > > > > > watermark
>> > > > > > > > > > lag, we still need to define the rule when that occurs
>> > > > > > > > >
>> > > > > > > > > This rule should be defined by the source itself (who
>> knows
>> > > > backlog
>> > > > > > > > best),
>> > > > > > > > > not by the framework. In the case of CDC source, it
>> reports
>> > > > > > > > isBacklog=true
>> > > > > > > > > during snapshot stage, and report isBacklog=false during
>> > > > changelog
>> > > > > > > stage
>> > > > > > > > if
>> > > > > > > > > watermark-lag is within the threshold.
>> > > > > > > > >
>> > > > > > > >
>> > > > > > > > I am not sure I fully understand the difference between
>> adding
>> > a
>> > > > > > > job-level
>> > > > > > > > config vs. adding a per-source config.
>> > > > > > > >
>> > > > > > > > In the case of CDC, its watermark lag should be either
>> > > unde-defined
>> > > > > or
>> > > > > > > > really large in the snapshot stage. As a result, the
>> proposed
>> > > > > job-level
>> > > > > > > > config will be applied only in the changelog stage. So
>> there is
>> > > no
>> > > > > > > > difference between these two approaches in this particular
>> > case,
>> > > > > right?
>> > > > > > > >
>> > > > > > > > There are two advantages of the job-level config over
>> > per-source
>> > > > > > config:
>> > > > > > > >
>> > > > > > > > 1) Configuration is simpler. For example, suppose a user
>> has a
>> > > > Flink
>> > > > > > job
>> > > > > > > > that consumes records from multiple Kafka sources and wants
>> to
>> > > > > > determine
>> > > > > > > > backlog status for these Kafka sources using the same
>> watermark
>> > > lag
>> > > > > > > > threshold, there is no need for users to repeatedly specify
>> > this
>> > > > > > > threshold
>> > > > > > > > for each source.
>> > > > > > > >
>> > > > > > > > 2) There is a smaller number of public APIs overall. In
>> > > particular,
>> > > > > > > instead
>> > > > > > > > of repeatedly adding a
>> > > setProcessingBacklogWatermarkLagThreshold()
>> > > > > API
>> > > > > > > for
>> > > > > > > > every source operator that has even-time watermark lag
>> defined,
>> > > we
>> > > > > only
>> > > > > > > > need to add one job-level config. Less public API means
>> better
>> > > > > > simplicity
>> > > > > > > > and maintainability in general.
>> > > > > > > >
>> > > > > > > > On the other hand, per-source config will be necessary if
>> users
>> > > > want
>> > > > > to
>> > > > > > > > apply different watermark lag thresholds for different
>> sources
>> > in
>> > > > the
>> > > > > > > same
>> > > > > > > > job. Personally, I find this a bit counter-intuitive for
>> users
>> > to
>> > > > > > specify
>> > > > > > > > different watermark lag thresholds in the same job.
>> > > > > > > >
>> > > > > > > > Do you think there is any real-word use-case that requires
>> > this?
>> > > > > Could
>> > > > > > > you
>> > > > > > > > provide a specific use-case where per-source config can
>> provide
>> > > an
>> > > > > > > > advantage over the job-level config?
>> > > > > > > >
>> > > > > > > >
>> > > > > > > > > I think it's not intuitive to combine it with the logical
>> OR
>> > > > > > operation.
>> > > > > > > > > Even for the
>> > > > > > > > > combination logic of backlog status from different
>> channels,
>> > > > > FLIP-309
>> > > > > > > > said
>> > > > > > > > > it is
>> > > > > > > > > "up to the operator to determine its output records'
>> > isBacklog
>> > > > > value"
>> > > > > > > and
>> > > > > > > > > proposed
>> > > > > > > > > 3 different strategies. Therefore, I think backlog status
>> > from
>> > > a
>> > > > > > single
>> > > > > > > > > source should
>> > > > > > > > > be up to the source.
>> > > > > > > >
>> > > > > > > >
>> > > > > > > > For both the job-level config and the per-source config, it
>> is
>> > > > > > eventually
>> > > > > > > > up to the user to decide the computation logic of the
>> backlog
>> > > > status.
>> > > > > > > > Whether this mechanism is implemented at the per-source
>> level
>> > or
>> > > > > > > framework
>> > > > > > > > level is probably more like an implementation detail.
>> > > > > > > >
>> > > > > > > > Eventually, I think the choice between these two approaches
>> > > depends
>> > > > > on
>> > > > > > > > whether we have any use-case for users to specify different
>> > > > watermark
>> > > > > > lag
>> > > > > > > > thresholds in the same job.
>> > > > > > > >
>> > > > > > > >
>> > > > > > > > >
>> > > > > > > > > IMO, a better API design is not how to resolve conflicts
>> but
>> > > not
>> > > > > > > > > introducing conflicts.
>> > > > > > > >
>> > > > > > > >
>> > > > > > > > Just to clarify, the current FLIP does not introduce any
>> > > conflict.
>> > > > > Each
>> > > > > > > > source can have its own rule that specifies when the backlog
>> > can
>> > > be
>> > > > > > true
>> > > > > > > > (e.g. MySql CDC says the backlog should be true during the
>> > > snapshot
>> > > > > > > stage).
>> > > > > > > > And we can have a job-level config that specifies when the
>> > > backlog
>> > > > > > should
>> > > > > > > > be true. Note that it is designed in such a way that none of
>> > > these
>> > > > > > rules
>> > > > > > > > specify when the backlog should be false. That is why there
>> is
>> > no
>> > > > > > > conflict
>> > > > > > > > by definition.
>> > > > > > > >
>> > > > > > > >
>> > > > > > > >
>> > > > > > > > Let the source determine backlog status removes the
>> conflicts
>> > > and I
>> > > > > > don't
>> > > > > > > > > see big
>> > > > > > > > > disadvantages.
>> > > > > > > > >
>> > > > > > > > > > It should not confuse the user that
>> > > > > > > > > > DataStream#assignTimestampsAndWatermarks doesn't work
>> with
>> > > > > > > > > > backlog.watermark-lag-threshold, as it is not a source.
>> > > > > > > > >
>> > > > > > > > > Hmm, so this configuration may confuse Flink SQL users,
>> > because
>> > > > all
>> > > > > > > > > watermarks
>> > > > > > > > > are defined on the source DDL, but it may use a separate
>> > > operator
>> > > > > to
>> > > > > > > emit
>> > > > > > > > > watermarks
>> > > > > > > > > if the source doesn't support emitting watermarks.
>> > > > > > > > >
>> > > > > > > >
>> > > > > > > > If I understand your comments correctly, you mean that we
>> might
>> > > > have
>> > > > > a
>> > > > > > > > Flink SQL DDL with user-defined watermark expressions. And
>> > users
>> > > > also
>> > > > > > > want
>> > > > > > > > to set the backlog to true if the watermark generated by
>> that
>> > > > > > > > user-specified expression exceeds a threshold.
>> > > > > > > >
>> > > > > > > > That is a good point and use-case. I agree we should also
>> cover
>> > > > this
>> > > > > > > > scenario. And we can update FLIP-328 to mention that the
>> > > job-level
>> > > > > > config
>> > > > > > > > will also be applicable when the watermark derived from the
>> > Flink
>> > > > SQL
>> > > > > > DDL
>> > > > > > > > exceeds this threshold. Would this address your concern?
>> > > > > > > >
>> > > > > > > >
>> > > > > > > > >
>> > > > > > > > > > I think the description in the FLIP actually means the
>> > other
>> > > > way
>> > > > > > > > > > around, where the job can never switch back to batch
>> mode
>> > > once
>> > > > it
>> > > > > > has
>> > > > > > > > > > switched into streaming mode. This is to align with the
>> > > current
>> > > > > > state
>> > > > > > > > > > of FLIP-327[1], where only switching from batch to
>> stream
>> > > mode
>> > > > is
>> > > > > > > > > > supported.
>> > > > > > > > >
>> > > > > > > > > This sounds like a limitation of FLIP-327 (that execution
>> > mode
>> > > > > > depends
>> > > > > > > on
>> > > > > > > > > backlog status).
>> > > > > > > > > But the backlog status shouldn't have this limitation,
>> > because
>> > > it
>> > > > > is
>> > > > > > > not
>> > > > > > > > > only used for execution
>> > > > > > > > > switching.
>> > > > > > > > >
>> > > > > > > >
>> > > > > > > > You are right that this is a limitation. However, this is
>> only
>> > a
>> > > > > > > short-term
>> > > > > > > > limitation which we added to make sure that we can focus on
>> the
>> > > > > > > capability
>> > > > > > > > to switch from backlog=true to backlog=false. In the
>> future, we
>> > > > will
>> > > > > > > remove
>> > > > > > > > this limitation and also support switching from
>> backlog=false
>> > to
>> > > > > > > > backlog=true.
>> > > > > > > >
>> > > > > > > > The capability to switch from backlog=true to backlog=false
>> > will
>> > > > > > > mitigate a
>> > > > > > > > lot of problems we are facing now. As it is common for
>> users to
>> > > > > start a
>> > > > > > > > Flink job to process backlog data followed by real-time
>> data.
>> > On
>> > > > the
>> > > > > > > other
>> > > > > > > > hand, switching from backlog=false to backlog=true is useful
>> > when
>> > > > > there
>> > > > > > > is
>> > > > > > > > a traffic spike while the Flink job is processing real-time
>> > data,
>> > > > > which
>> > > > > > > is
>> > > > > > > > also useful to address but less important than the previous
>> > one.
>> > > > > > > >
>> > > > > > > > Given that both features require considerable changes to the
>> > > > > underlying
>> > > > > > > > runtime, we think it might be useful and safe to tackle them
>> > one
>> > > by
>> > > > > > one.
>> > > > > > > >
>> > > > > > > > Thanks again for the comments. Please let us know what you
>> > think.
>> > > > > > > >
>> > > > > > > > Best,
>> > > > > > > > Dong
>> > > > > > > >
>> > > > > > > >
>> > > > > > > > >
>> > > > > > > > > Best,
>> > > > > > > > > Jark
>> > > > > > > > >
>> > > > > > > > >
>> > > > > > > > >
>> > > > > > > > > On Fri, 8 Sept 2023 at 19:09, Xuannan Su <
>> > > suxuanna...@gmail.com>
>> > > > > > > wrote:
>> > > > > > > > >
>> > > > > > > > > > Hi Jark and Leonard,
>> > > > > > > > > >
>> > > > > > > > > > Thanks for the comments. Please see my reply below.
>> > > > > > > > > >
>> > > > > > > > > > @Jark
>> > > > > > > > > >
>> > > > > > > > > > > I think a better API doesn't compete with itself.
>> > > Therefore,
>> > > > > I'm
>> > > > > > in
>> > > > > > > > > > favor of
>> > > > > > > > > > > supporting the watermark lag threshold for each source
>> > > > without
>> > > > > > > > > > introducing
>> > > > > > > > > > > any framework API and configuration.
>> > > > > > > > > >
>> > > > > > > > > > I don't think supporting the watermark lag threshold for
>> > each
>> > > > > > source
>> > > > > > > > > > can avoid the competition problem. In the case where a
>> user
>> > > > wants
>> > > > > > to
>> > > > > > > > > > use a CDC source and also determine backlog status
>> based on
>> > > > > > watermark
>> > > > > > > > > > lag, we still need to define the rule when that occurs.
>> > With
>> > > > that
>> > > > > > > > > > said, I think it is more intuitive to combine it with
>> the
>> > > > logical
>> > > > > > OR
>> > > > > > > > > > operation, as the strategies (FLIP-309, FLIP-328) only
>> > > > determine
>> > > > > > when
>> > > > > > > > > > the source's backlog status should be True. What do you
>> > > think?
>> > > > > > > > > >
>> > > > > > > > > > > Besides, this can address another concern that the
>> > > watermark
>> > > > > may
>> > > > > > be
>> > > > > > > > > > > generated by DataStream#assignTimestampsAnd
>> > > > > > > > > > > Watermarks which doesn't
>> > > > > > > > > > > work with the backlog.watermark-lag-threshold job
>> config
>> > > > > > > > > >
>> > > > > > > > > > The description of the configuration explicitly states
>> that
>> > > "a
>> > > > > > source
>> > > > > > > > > > would report isProcessingBacklog=true if its watermark
>> lag
>> > > > > exceeds
>> > > > > > > the
>> > > > > > > > > > configured value". It should not confuse the user that
>> > > > > > > > > > DataStream#assignTimestampsAndWatermarks doesn't work
>> with
>> > > > > > > > > > backlog.watermark-lag-threshold, as it is not a source.
>> > > > > > > > > >
>> > > > > > > > > > > Does that mean the job can never back to streaming
>> mode
>> > > once
>> > > > > > > switches
>> > > > > > > > > > into
>> > > > > > > > > > > backlog mode? It sounds like not a complete FLIP to
>> me.
>> > Is
>> > > it
>> > > > > > > > possible
>> > > > > > > > > to
>> > > > > > > > > > > support switching back in this FLIP?
>> > > > > > > > > >
>> > > > > > > > > > I think the description in the FLIP actually means the
>> > other
>> > > > way
>> > > > > > > > > > around, where the job can never switch back to batch
>> mode
>> > > once
>> > > > it
>> > > > > > has
>> > > > > > > > > > switched into streaming mode. This is to align with the
>> > > current
>> > > > > > state
>> > > > > > > > > > of FLIP-327[1], where only switching from batch to
>> stream
>> > > mode
>> > > > is
>> > > > > > > > > > supported.
>> > > > > > > > > >
>> > > > > > > > > > @Leonard
>> > > > > > > > > >
>> > > > > > > > > > > > The FLIP describe that: And it should report
>> > > > > > > > > isProcessingBacklog=false
>> > > > > > > > > > at the beginning of the snapshot stage.
>> > > > > > > > > > > This should be “changelog stage”
>> > > > > > > > > >
>> > > > > > > > > > I think the description is in FLIP-309. Thanks for
>> pointing
>> > > > out.
>> > > > > I
>> > > > > > > > > > updated the description.
>> > > > > > > > > >
>> > > > > > > > > > > I'm not sure if it's enough to support this feature
>> only
>> > in
>> > > > > > FLIP-27
>> > > > > > > > > > Source. Although we are pushing the sourceFunction API
>> to
>> > be
>> > > > > > removed,
>> > > > > > > > > these
>> > > > > > > > > > APIs will be survive one or two versions in flink repo
>> > before
>> > > > > they
>> > > > > > > are
>> > > > > > > > > > actually removed.
>> > > > > > > > > >
>> > > > > > > > > > I agree that it is good to support the SourceFunction
>> API.
>> > > > > However,
>> > > > > > > > > > given that the SourceFunction API is marked as
>> deprecated,
>> > I
>> > > > > think
>> > > > > > I
>> > > > > > > > > > will prioritize supporting the FLIP-27 Source. We can
>> > support
>> > > > the
>> > > > > > > > > > SourceFunction API after the
>> > > > > > > > > > FLIP-27 source. What do you think?
>> > > > > > > > > >
>> > > > > > > > > > Best regards,
>> > > > > > > > > > Xuannan
>> > > > > > > > > >
>> > > > > > > > > > [1]
>> > > > > > > > > >
>> > > > > > > > >
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-327%3A+Support+switching+from+batch+to+stream+mode+to+improve+throughput+when+processing+backlog+data
>> > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > > > On Fri, Sep 8, 2023 at 1:02 AM Leonard Xu <
>> > xbjt...@gmail.com
>> > > >
>> > > > > > wrote:
>> > > > > > > > > > >
>> > > > > > > > > > > Thanks Xuannan for driving this FLIP !
>> > > > > > > > > > >
>> > > > > > > > > > > The proposal generally looks good to me, but I still
>> left
>> > > > some
>> > > > > > > > > comments:
>> > > > > > > > > > >
>> > > > > > > > > > > > One more question about the FLIP is that the FLIP
>> says
>> > > > "Note
>> > > > > > that
>> > > > > > > > > this
>> > > > > > > > > > > > config does not support switching source's
>> > > > > isProcessingBacklog
>> > > > > > > from
>> > > > > > > > > > false to true
>> > > > > > > > > > > > for now.” Does that mean the job can never back to
>> > > > streaming
>> > > > > > mode
>> > > > > > > > > once
>> > > > > > > > > > switches into
>> > > > > > > > > > > > backlog mode? It sounds like not a complete FLIP to
>> me.
>> > > Is
>> > > > it
>> > > > > > > > > possible
>> > > > > > > > > > to
>> > > > > > > > > > > > support switching back in this FLIP?
>> > > > > > > > > > > +1 for Jark’s concern, IIUC, the state transition of
>> > > > > > > > > IsProcessingBacklog
>> > > > > > > > > > depends on whether the data in the source is processing
>> > > backlog
>> > > > > > data
>> > > > > > > or
>> > > > > > > > > > not. Different sources will have different backlog
>> status
>> > and
>> > > > > which
>> > > > > > > may
>> > > > > > > > > > change over time. From a general perspective, we should
>> not
>> > > > have
>> > > > > > this
>> > > > > > > > > > restriction.
>> > > > > > > > > > >
>> > > > > > > > > > > > The FLIP describe that: And it should report
>> > > > > > > > > isProcessingBacklog=false
>> > > > > > > > > > at the beginning of the snapshot stage.
>> > > > > > > > > > > This should be “changelog stage”
>> > > > > > > > > > >
>> > > > > > > > > > > I'm not sure if it's enough to support this feature
>> only
>> > in
>> > > > > > FLIP-27
>> > > > > > > > > > Source. Although we are pushing the sourceFunction API
>> to
>> > be
>> > > > > > removed,
>> > > > > > > > > these
>> > > > > > > > > > APIs will be survive one or two versions in flink repo
>> > before
>> > > > > they
>> > > > > > > are
>> > > > > > > > > > actually removed.
>> > > > > > > > > > >
>> > > > > > > > > > > Best,
>> > > > > > > > > > > Leonard
>> > > > > > > > > > >
>> > > > > > > > > > > >
>> > > > > > > > > > > > Best,
>> > > > > > > > > > > > Jark
>> > > > > > > > > > > >
>> > > > > > > > > > > >
>> > > > > > > > > > > > On Thu, 7 Sept 2023 at 13:51, Xuannan Su <
>> > > > > > suxuanna...@gmail.com>
>> > > > > > > > > > wrote:
>> > > > > > > > > > > >
>> > > > > > > > > > > >> Hi all,
>> > > > > > > > > > > >>
>> > > > > > > > > > > >> Thank you for all the reviews and suggestions.
>> > > > > > > > > > > >>
>> > > > > > > > > > > >> I believe all the comments have been addressed. If
>> > there
>> > > > are
>> > > > > > no
>> > > > > > > > > > > >> further comments, I plan to open the voting thread
>> for
>> > > > this
>> > > > > > FLIP
>> > > > > > > > > early
>> > > > > > > > > > > >> next week.
>> > > > > > > > > > > >>
>> > > > > > > > > > > >> Best regards,
>> > > > > > > > > > > >> Xuannan
>> > > > > > > > > > > >>
>> > > > > > > > > > > >> On Thu, Sep 7, 2023 at 12:09 AM Jing Ge
>> > > > > > > > <j...@ververica.com.invalid
>> > > > > > > > > >
>> > > > > > > > > > > >> wrote:
>> > > > > > > > > > > >>>
>> > > > > > > > > > > >>> Hi Xuannan,
>> > > > > > > > > > > >>>
>> > > > > > > > > > > >>> I thought FLIP-328 will compete with FLIP-309
>> while
>> > > > setting
>> > > > > > the
>> > > > > > > > > > value of
>> > > > > > > > > > > >>> the backlog. Understood. Thanks for the hint.
>> > > > > > > > > > > >>>
>> > > > > > > > > > > >>> Best regards,
>> > > > > > > > > > > >>> Jing
>> > > > > > > > > > > >>>
>> > > > > > > > > > > >>> On Wed, Sep 6, 2023 at 12:12 PM Xuannan Su <
>> > > > > > > > suxuanna...@gmail.com>
>> > > > > > > > > > > >> wrote:
>> > > > > > > > > > > >>>
>> > > > > > > > > > > >>>> Hi Jing,
>> > > > > > > > > > > >>>>
>> > > > > > > > > > > >>>> Thank you for the clarification.
>> > > > > > > > > > > >>>>
>> > > > > > > > > > > >>>> For the use case you mentioned, I believe we can
>> > > utilize
>> > > > > the
>> > > > > > > > > > > >>>> HybridSource, as updated in FLIP-309[1], to
>> > determine
>> > > > the
>> > > > > > > > backlog
>> > > > > > > > > > > >>>> status. For example, if the user wants to process
>> > data
>> > > > > > before
>> > > > > > > > > time T
>> > > > > > > > > > > >>>> in batch mode and after time T in stream mode,
>> they
>> > > can
>> > > > > set
>> > > > > > > the
>> > > > > > > > > > first
>> > > > > > > > > > > >>>> source of the HybridSource to read up to time T
>> and
>> > > the
>> > > > > last
>> > > > > > > > > source
>> > > > > > > > > > of
>> > > > > > > > > > > >>>> the HybridSource to read from time T.
>> > > > > > > > > > > >>>>
>> > > > > > > > > > > >>>> Best,
>> > > > > > > > > > > >>>> Xuannan
>> > > > > > > > > > > >>>>
>> > > > > > > > > > > >>>> [1]
>> > > > > > > > > > > >>>>
>> > > > > > > > > > > >>
>> > > > > > > > > >
>> > > > > > > > >
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-309%3A+Support+using+larger+checkpointing+interval+when+source+is+processing+backlog
>> > > > > > > > > > > >>>>
>> > > > > > > > > > > >>>>
>> > > > > > > > > > > >>>> On Mon, Sep 4, 2023 at 10:36 PM Jing Ge
>> > > > > > > > > <j...@ververica.com.invalid
>> > > > > > > > > > >
>> > > > > > > > > > > >>>> wrote:
>> > > > > > > > > > > >>>>>
>> > > > > > > > > > > >>>>> Hi Xuannan,
>> > > > > > > > > > > >>>>>
>> > > > > > > > > > > >>>>> Thanks for the clarification.
>> > > > > > > > > > > >>>>>
>> > > > > > > > > > > >>>>> 3. Event time and process time are two different
>> > > > things.
>> > > > > It
>> > > > > > > > might
>> > > > > > > > > > be
>> > > > > > > > > > > >>>> rarely
>> > > > > > > > > > > >>>>> used, but conceptually, users can process data
>> in
>> > the
>> > > > > past
>> > > > > > > > > within a
>> > > > > > > > > > > >>>>> specific time range in the streaming mode. All
>> data
>> > > > > before
>> > > > > > > that
>> > > > > > > > > > range
>> > > > > > > > > > > >>>> will
>> > > > > > > > > > > >>>>> be considered as backlog and needed to be
>> processed
>> > > in
>> > > > > the
>> > > > > > > > batch
>> > > > > > > > > > > >> mode,
>> > > > > > > > > > > >>>>> like, e.g. the Present Perfect Progressive tense
>> > used
>> > > > in
>> > > > > > > > English
>> > > > > > > > > > > >>>> language.
>> > > > > > > > > > > >>>>>
>> > > > > > > > > > > >>>>> Best regards,
>> > > > > > > > > > > >>>>> Jing
>> > > > > > > > > > > >>>>>
>> > > > > > > > > > > >>>>> On Thu, Aug 31, 2023 at 4:45 AM Xuannan Su <
>> > > > > > > > > suxuanna...@gmail.com>
>> > > > > > > > > > > >>>> wrote:
>> > > > > > > > > > > >>>>>
>> > > > > > > > > > > >>>>>> Hi Jing,
>> > > > > > > > > > > >>>>>>
>> > > > > > > > > > > >>>>>> Thanks for the reply.
>> > > > > > > > > > > >>>>>>
>> > > > > > > > > > > >>>>>> 1. You are absolutely right that the watermark
>> lag
>> > > > > > threshold
>> > > > > > > > > must
>> > > > > > > > > > > >> be
>> > > > > > > > > > > >>>>>> carefully set with a thorough understanding of
>> > > > watermark
>> > > > > > > > > > > >> generation.
>> > > > > > > > > > > >>>> It is
>> > > > > > > > > > > >>>>>> crucial for users to take into account the
>> > > > > > WatermarkStrategy
>> > > > > > > > > when
>> > > > > > > > > > > >>>> setting
>> > > > > > > > > > > >>>>>> the watermark lag threshold.
>> > > > > > > > > > > >>>>>>
>> > > > > > > > > > > >>>>>> 2. Regarding pure processing-time based stream
>> > > > > processing
>> > > > > > > > jobs,
>> > > > > > > > > > > >>>>>> alternative strategies will be implemented to
>> > > > determine
>> > > > > > > > whether
>> > > > > > > > > > the
>> > > > > > > > > > > >>>> job is
>> > > > > > > > > > > >>>>>> processing backlog data. I have outlined two
>> > > possible
>> > > > > > > > strategies
>> > > > > > > > > > > >> below:
>> > > > > > > > > > > >>>>>>
>> > > > > > > > > > > >>>>>> - Based on the source operator's state. For
>> > example,
>> > > > > when
>> > > > > > > > MySQL
>> > > > > > > > > > CDC
>> > > > > > > > > > > >>>> source
>> > > > > > > > > > > >>>>>> is reading snapshot, it can claim
>> isBacklog=true.
>> > > > > > > > > > > >>>>>> - Based on metrics. For example, when
>> > > > > busyTimeMsPerSecond
>> > > > > > > (or
>> > > > > > > > > > > >>>>>> backPressuredTimeMsPerSecond) >
>> > > > > user_specified_threshold,
>> > > > > > > then
>> > > > > > > > > > > >>>>>> isBacklog=true.
>> > > > > > > > > > > >>>>>>
>> > > > > > > > > > > >>>>>> As of the strategies proposed in this FLIP, it
>> > rely
>> > > on
>> > > > > > > > generated
>> > > > > > > > > > > >>>>>> watermarks. Therefore, if a user intends for
>> the
>> > job
>> > > > to
>> > > > > > > detect
>> > > > > > > > > > > >> backlog
>> > > > > > > > > > > >>>>>> status based on watermark, it is necessary to
>> > > generate
>> > > > > the
>> > > > > > > > > > > >> watermark.
>> > > > > > > > > > > >>>>>>
>> > > > > > > > > > > >>>>>> 3. I'm afraid I'm not fully grasping your
>> > question.
>> > > > From
>> > > > > > my
>> > > > > > > > > > > >>>> understanding,
>> > > > > > > > > > > >>>>>> it should work in both cases. When event times
>> are
>> > > > close
>> > > > > > to
>> > > > > > > > the
>> > > > > > > > > > > >>>> processing
>> > > > > > > > > > > >>>>>> time, resulting in watermarks close to the
>> > > processing
>> > > > > > time,
>> > > > > > > > the
>> > > > > > > > > > > >> job is
>> > > > > > > > > > > >>>> not
>> > > > > > > > > > > >>>>>> processing backlog data. On the other hand,
>> when
>> > > event
>> > > > > > times
>> > > > > > > > are
>> > > > > > > > > > > >> far
>> > > > > > > > > > > >>>> from
>> > > > > > > > > > > >>>>>> processing time, causing watermarks to also be
>> > > > distant,
>> > > > > if
>> > > > > > > the
>> > > > > > > > > lag
>> > > > > > > > > > > >>>>>> surpasses the defined threshold, the job is
>> > > considered
>> > > > > > > > > processing
>> > > > > > > > > > > >>>> backlog
>> > > > > > > > > > > >>>>>> data.
>> > > > > > > > > > > >>>>>>
>> > > > > > > > > > > >>>>>> Best,
>> > > > > > > > > > > >>>>>> Xuannan
>> > > > > > > > > > > >>>>>>
>> > > > > > > > > > > >>>>>>
>> > > > > > > > > > > >>>>>>> On Aug 31, 2023, at 02:56, Jing Ge
>> > > > > > > > <j...@ververica.com.INVALID
>> > > > > > > > > >
>> > > > > > > > > > > >>>> wrote:
>> > > > > > > > > > > >>>>>>>
>> > > > > > > > > > > >>>>>>> Hi Xuannan,
>> > > > > > > > > > > >>>>>>>
>> > > > > > > > > > > >>>>>>> Thanks for the clarification. That is the part
>> > > where
>> > > > I
>> > > > > am
>> > > > > > > > > trying
>> > > > > > > > > > > >> to
>> > > > > > > > > > > >>>>>>> understand your thoughts. I have some
>> follow-up
>> > > > > > questions:
>> > > > > > > > > > > >>>>>>>
>> > > > > > > > > > > >>>>>>> 1. It depends strongly on the
>> watermarkStrategy
>> > and
>> > > > how
>> > > > > > > > > > > >> customized
>> > > > > > > > > > > >>>>>>> watermark generation looks like. It mixes
>> > business
>> > > > > logic
>> > > > > > > with
>> > > > > > > > > > > >>>> technical
>> > > > > > > > > > > >>>>>>> implementation and technical data processing
>> > mode.
>> > > > The
>> > > > > > > value
>> > > > > > > > of
>> > > > > > > > > > > >> the
>> > > > > > > > > > > >>>>>>> watermark lag threshold must be set very
>> > carefully.
>> > > > If
>> > > > > > the
>> > > > > > > > > value
>> > > > > > > > > > > >> is
>> > > > > > > > > > > >>>> too
>> > > > > > > > > > > >>>>>>> small. any time, when the watermark generation
>> > > logic
>> > > > is
>> > > > > > > > > > > >>>> changed(business
>> > > > > > > > > > > >>>>>>> logic changes lead to the threshold getting
>> > > > exceeded),
>> > > > > > the
>> > > > > > > > same
>> > > > > > > > > > > >> job
>> > > > > > > > > > > >>>> might
>> > > > > > > > > > > >>>>>>> be running surprisingly in backlog processing
>> > mode,
>> > > > > i.e.
>> > > > > > a
>> > > > > > > > > > > >> butterfly
>> > > > > > > > > > > >>>>>>> effect. A comprehensive documentation is
>> required
>> > > to
>> > > > > > avoid
>> > > > > > > > any
>> > > > > > > > > > > >>>> confusion
>> > > > > > > > > > > >>>>>>> for the users.
>> > > > > > > > > > > >>>>>>> 2. Like Jark already mentioned, use cases
>> that do
>> > > not
>> > > > > > have
>> > > > > > > > > > > >>>> watermarks,
>> > > > > > > > > > > >>>>>>> like pure processing-time based stream
>> > > processing[1]
>> > > > > are
>> > > > > > > not
>> > > > > > > > > > > >>>> covered. It
>> > > > > > > > > > > >>>>>> is
>> > > > > > > > > > > >>>>>>> more or less a trade-off solution that does
>> not
>> > > > support
>> > > > > > > such
>> > > > > > > > > use
>> > > > > > > > > > > >>>> cases
>> > > > > > > > > > > >>>>>> and
>> > > > > > > > > > > >>>>>>> appropriate documentation is required. Forcing
>> > them
>> > > > to
>> > > > > > > > > explicitly
>> > > > > > > > > > > >>>>>> generate
>> > > > > > > > > > > >>>>>>> watermarks that are never needed just because
>> of
>> > > this
>> > > > > > does
>> > > > > > > > not
>> > > > > > > > > > > >> sound
>> > > > > > > > > > > >>>>>> like a
>> > > > > > > > > > > >>>>>>> proper solution.
>> > > > > > > > > > > >>>>>>> 3. If I am not mistaken, it only works for use
>> > > cases
>> > > > > > where
>> > > > > > > > > event
>> > > > > > > > > > > >>>> times
>> > > > > > > > > > > >>>>>> are
>> > > > > > > > > > > >>>>>>> very close to the processing times, because
>> the
>> > > wall
>> > > > > > clock
>> > > > > > > is
>> > > > > > > > > > > >> used to
>> > > > > > > > > > > >>>>>>> calculate the watermark lag and the watermark
>> is
>> > > > > > generated
>> > > > > > > > > based
>> > > > > > > > > > > >> on
>> > > > > > > > > > > >>>> the
>> > > > > > > > > > > >>>>>>> event time.
>> > > > > > > > > > > >>>>>>>
>> > > > > > > > > > > >>>>>>> Best regards,
>> > > > > > > > > > > >>>>>>> Jing
>> > > > > > > > > > > >>>>>>>
>> > > > > > > > > > > >>>>>>> [1]
>> > > > > > > > > > > >>>>>>>
>> > > > > > > > > > > >>>>>>
>> > > > > > > > > > > >>>>
>> > > > > > > > > > > >>
>> > > > > > > > > >
>> > > > > > > > >
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> https://github.com/apache/flink/blob/2c50b4e956305426f478b726d4de4a640a16b810/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkStrategy.java#L236
>> > > > > > > > > > > >>>>>>>
>> > > > > > > > > > > >>>>>>> On Wed, Aug 30, 2023 at 4:06 AM Xuannan Su <
>> > > > > > > > > > > >> suxuanna...@gmail.com>
>> > > > > > > > > > > >>>>>> wrote:
>> > > > > > > > > > > >>>>>>>
>> > > > > > > > > > > >>>>>>>> Hi Jing,
>> > > > > > > > > > > >>>>>>>>
>> > > > > > > > > > > >>>>>>>> Thank you for the suggestion.
>> > > > > > > > > > > >>>>>>>>
>> > > > > > > > > > > >>>>>>>> The definition of watermark lag is the same
>> as
>> > the
>> > > > > > > > > watermarkLag
>> > > > > > > > > > > >>>> metric
>> > > > > > > > > > > >>>>>> in
>> > > > > > > > > > > >>>>>>>> FLIP-33[1]. More specifically, the watermark
>> lag
>> > > > > > > calculation
>> > > > > > > > > is
>> > > > > > > > > > > >>>>>> computed at
>> > > > > > > > > > > >>>>>>>> the time when a watermark is emitted
>> downstream
>> > in
>> > > > the
>> > > > > > > > > following
>> > > > > > > > > > > >>>> way:
>> > > > > > > > > > > >>>>>>>> watermarkLag = CurrentTime - Watermark. I
>> have
>> > > added
>> > > > > > this
>> > > > > > > > > > > >>>> description to
>> > > > > > > > > > > >>>>>>>> the FLIP.
>> > > > > > > > > > > >>>>>>>>
>> > > > > > > > > > > >>>>>>>> I hope this addresses your concern.
>> > > > > > > > > > > >>>>>>>>
>> > > > > > > > > > > >>>>>>>> Best,
>> > > > > > > > > > > >>>>>>>> Xuannan
>> > > > > > > > > > > >>>>>>>>
>> > > > > > > > > > > >>>>>>>> [1]
>> > > > > > > > > > > >>>>>>>>
>> > > > > > > > > > > >>>>>>
>> > > > > > > > > > > >>>>
>> > > > > > > > > > > >>
>> > > > > > > > > >
>> > > > > > > > >
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics
>> > > > > > > > > > > >>>>>>>>
>> > > > > > > > > > > >>>>>>>>
>> > > > > > > > > > > >>>>>>>>> On Aug 28, 2023, at 01:04, Jing Ge
>> > > > > > > > > <j...@ververica.com.INVALID
>> > > > > > > > > > > >>>
>> > > > > > > > > > > >>>> wrote:
>> > > > > > > > > > > >>>>>>>>>
>> > > > > > > > > > > >>>>>>>>> Hi Xuannan,
>> > > > > > > > > > > >>>>>>>>>
>> > > > > > > > > > > >>>>>>>>> Thanks for the proposal. +1 for me.
>> > > > > > > > > > > >>>>>>>>>
>> > > > > > > > > > > >>>>>>>>> There is one tiny thing that I am not sure
>> if I
>> > > > > > > understand
>> > > > > > > > it
>> > > > > > > > > > > >>>>>> correctly.
>> > > > > > > > > > > >>>>>>>>> Since there will be many different
>> > > > > WatermarkStrategies
>> > > > > > > and
>> > > > > > > > > > > >>>> different
>> > > > > > > > > > > >>>>>>>>> WatermarkGenerators. Could you please update
>> > the
>> > > > FLIP
>> > > > > > and
>> > > > > > > > add
>> > > > > > > > > > > >> the
>> > > > > > > > > > > >>>>>>>>> description of how the watermark lag is
>> > > calculated
>> > > > > > > exactly?
>> > > > > > > > > > > >> E.g.
>> > > > > > > > > > > >>>>>>>> Watermark
>> > > > > > > > > > > >>>>>>>>> lag = A - B with A is the timestamp of the
>> > > > watermark
>> > > > > > > > emitted
>> > > > > > > > > > > >> to the
>> > > > > > > > > > > >>>>>>>>> downstream and B is....(this is the part I
>> am
>> > not
>> > > > > > really
>> > > > > > > > sure
>> > > > > > > > > > > >> after
>> > > > > > > > > > > >>>>>>>> reading
>> > > > > > > > > > > >>>>>>>>> the FLIP).
>> > > > > > > > > > > >>>>>>>>>
>> > > > > > > > > > > >>>>>>>>> Best regards,
>> > > > > > > > > > > >>>>>>>>> Jing
>> > > > > > > > > > > >>>>>>>>>
>> > > > > > > > > > > >>>>>>>>>
>> > > > > > > > > > > >>>>>>>>> On Mon, Aug 21, 2023 at 9:03 AM Xuannan Su <
>> > > > > > > > > > > >> suxuanna...@gmail.com>
>> > > > > > > > > > > >>>>>>>> wrote:
>> > > > > > > > > > > >>>>>>>>>
>> > > > > > > > > > > >>>>>>>>>> Hi Jark,
>> > > > > > > > > > > >>>>>>>>>>
>> > > > > > > > > > > >>>>>>>>>> Thanks for the comments.
>> > > > > > > > > > > >>>>>>>>>>
>> > > > > > > > > > > >>>>>>>>>> I agree that the current solution cannot
>> > support
>> > > > > jobs
>> > > > > > > that
>> > > > > > > > > > > >> cannot
>> > > > > > > > > > > >>>>>> define
>> > > > > > > > > > > >>>>>>>>>> watermarks. However, after considering the
>> > > > > > > > > > > >> pending-record-based
>> > > > > > > > > > > >>>>>>>> solution, I
>> > > > > > > > > > > >>>>>>>>>> believe the current solution is superior
>> for
>> > the
>> > > > > > target
>> > > > > > > > use
>> > > > > > > > > > > >> case
>> > > > > > > > > > > >>>> as it
>> > > > > > > > > > > >>>>>>>> is
>> > > > > > > > > > > >>>>>>>>>> more intuitive for users. The backlog
>> status
>> > > gives
>> > > > > > users
>> > > > > > > > the
>> > > > > > > > > > > >>>> ability
>> > > > > > > > > > > >>>>>> to
>> > > > > > > > > > > >>>>>>>>>> balance between throughput and latency.
>> Making
>> > > > this
>> > > > > > > > > trade-off
>> > > > > > > > > > > >>>> decision
>> > > > > > > > > > > >>>>>>>>>> based on the watermark lag is more
>> intuitive
>> > > from
>> > > > > the
>> > > > > > > > user's
>> > > > > > > > > > > >>>>>>>> perspective.
>> > > > > > > > > > > >>>>>>>>>> For instance, a user can decide that if the
>> > job
>> > > > lags
>> > > > > > > > behind
>> > > > > > > > > > > >> the
>> > > > > > > > > > > >>>>>> current
>> > > > > > > > > > > >>>>>>>>>> time by more than 1 hour, the result is not
>> > > > usable.
>> > > > > In
>> > > > > > > > that
>> > > > > > > > > > > >> case,
>> > > > > > > > > > > >>>> we
>> > > > > > > > > > > >>>>>> can
>> > > > > > > > > > > >>>>>>>>>> optimize for throughput when the data lags
>> > > behind
>> > > > by
>> > > > > > > more
>> > > > > > > > > > > >> than an
>> > > > > > > > > > > >>>>>> hour.
>> > > > > > > > > > > >>>>>>>>>> With the pending-record-based solution,
>> it's
>> > > > > > challenging
>> > > > > > > > for
>> > > > > > > > > > > >>>> users to
>> > > > > > > > > > > >>>>>>>>>> determine when to optimize for throughput
>> and
>> > > when
>> > > > > to
>> > > > > > > > > > > >> prioritize
>> > > > > > > > > > > >>>>>>>> latency.
>> > > > > > > > > > > >>>>>>>>>>
>> > > > > > > > > > > >>>>>>>>>> Regarding the limitations of the
>> > watermark-based
>> > > > > > > solution:
>> > > > > > > > > > > >>>>>>>>>>
>> > > > > > > > > > > >>>>>>>>>> 1. The current solution can support jobs
>> with
>> > > > > sources
>> > > > > > > that
>> > > > > > > > > > > >> have
>> > > > > > > > > > > >>>> event
>> > > > > > > > > > > >>>>>>>>>> time. Users can always define a watermark
>> at
>> > the
>> > > > > > source
>> > > > > > > > > > > >> operator,
>> > > > > > > > > > > >>>> even
>> > > > > > > > > > > >>>>>>>> if
>> > > > > > > > > > > >>>>>>>>>> it's not used by downstream operators,
>> such as
>> > > > > > streaming
>> > > > > > > > > join
>> > > > > > > > > > > >> and
>> > > > > > > > > > > >>>>>>>> unbounded
>> > > > > > > > > > > >>>>>>>>>> aggregate.
>> > > > > > > > > > > >>>>>>>>>>
>> > > > > > > > > > > >>>>>>>>>> 2.I don't believe it's accurate to say that
>> > the
>> > > > > > > watermark
>> > > > > > > > > lag
>> > > > > > > > > > > >> will
>> > > > > > > > > > > >>>>>> keep
>> > > > > > > > > > > >>>>>>>>>> increasing if no data is generated in
>> Kafka.
>> > The
>> > > > > > > watermark
>> > > > > > > > > > > >> lag and
>> > > > > > > > > > > >>>>>>>> backlog
>> > > > > > > > > > > >>>>>>>>>> status are determined at the moment when
>> the
>> > > > > watermark
>> > > > > > > is
>> > > > > > > > > > > >> emitted
>> > > > > > > > > > > >>>> to
>> > > > > > > > > > > >>>>>> the
>> > > > > > > > > > > >>>>>>>>>> downstream operator. If no data is emitted
>> > from
>> > > > the
>> > > > > > > > source,
>> > > > > > > > > > > >> the
>> > > > > > > > > > > >>>>>>>> watermark
>> > > > > > > > > > > >>>>>>>>>> lag and backlog status will not be
>> updated. If
>> > > the
>> > > > > > > > > > > >>>> WatermarkStrategy
>> > > > > > > > > > > >>>>>>>> with
>> > > > > > > > > > > >>>>>>>>>> idleness is used, the source becomes
>> > non-backlog
>> > > > > when
>> > > > > > it
>> > > > > > > > > > > >> becomes
>> > > > > > > > > > > >>>> idle.
>> > > > > > > > > > > >>>>>>>>>>
>> > > > > > > > > > > >>>>>>>>>> 3. I think watermark lag is more intuitive
>> to
>> > > > > > determine
>> > > > > > > > if a
>> > > > > > > > > > > >> job
>> > > > > > > > > > > >>>> is
>> > > > > > > > > > > >>>>>>>>>> processing backlog data. Even when using
>> > pending
>> > > > > > > records,
>> > > > > > > > it
>> > > > > > > > > > > >>>> faces a
>> > > > > > > > > > > >>>>>>>>>> similar issue. For example, if the source
>> has
>> > 1K
>> > > > > > pending
>> > > > > > > > > > > >> records,
>> > > > > > > > > > > >>>>>> those
>> > > > > > > > > > > >>>>>>>>>> records can span from 1 day  to 1 hour to 1
>> > > > second.
>> > > > > If
>> > > > > > > the
>> > > > > > > > > > > >> records
>> > > > > > > > > > > >>>>>> span
>> > > > > > > > > > > >>>>>>>> 1
>> > > > > > > > > > > >>>>>>>>>> day, it's probably best to optimize for
>> > > > throughput.
>> > > > > If
>> > > > > > > > they
>> > > > > > > > > > > >> span 1
>> > > > > > > > > > > >>>>>>>> hour, it
>> > > > > > > > > > > >>>>>>>>>> depends on the business logic. If they
>> span 1
>> > > > > second,
>> > > > > > > > > > > >> optimizing
>> > > > > > > > > > > >>>> for
>> > > > > > > > > > > >>>>>>>>>> latency is likely the better choice.
>> > > > > > > > > > > >>>>>>>>>>
>> > > > > > > > > > > >>>>>>>>>> In summary, I believe the watermark-based
>> > > solution
>> > > > > is
>> > > > > > a
>> > > > > > > > > > > >> superior
>> > > > > > > > > > > >>>>>> choice
>> > > > > > > > > > > >>>>>>>>>> for the target use case where
>> watermark/event
>> > > time
>> > > > > can
>> > > > > > > be
>> > > > > > > > > > > >> defined.
>> > > > > > > > > > > >>>>>>>>>> Additionally, I haven't come across a
>> scenario
>> > > > that
>> > > > > > > > requires
>> > > > > > > > > > > >>>>>> low-latency
>> > > > > > > > > > > >>>>>>>>>> processing and reads from a source that
>> cannot
>> > > > > define
>> > > > > > > > > > > >> watermarks.
>> > > > > > > > > > > >>>> If
>> > > > > > > > > > > >>>>>> we
>> > > > > > > > > > > >>>>>>>>>> encounter such a use case, we can create
>> > another
>> > > > > FLIP
>> > > > > > to
>> > > > > > > > > > > >> address
>> > > > > > > > > > > >>>> those
>> > > > > > > > > > > >>>>>>>>>> needs in the future. What do you think?
>> > > > > > > > > > > >>>>>>>>>>
>> > > > > > > > > > > >>>>>>>>>>
>> > > > > > > > > > > >>>>>>>>>> Best,
>> > > > > > > > > > > >>>>>>>>>> Xuannan
>> > > > > > > > > > > >>>>>>>>>>
>> > > > > > > > > > > >>>>>>>>>>
>> > > > > > > > > > > >>>>>>>>>>
>> > > > > > > > > > > >>>>>>>>>>> On Aug 20, 2023, at 23:27, Jark Wu <
>> > > > > imj...@gmail.com
>> > > > > > > > > > > >> <mailto:
>> > > > > > > > > > > >>>>>>>>>> imj...@gmail.com>> wrote:
>> > > > > > > > > > > >>>>>>>>>>>
>> > > > > > > > > > > >>>>>>>>>>> Hi Xuannan,
>> > > > > > > > > > > >>>>>>>>>>>
>> > > > > > > > > > > >>>>>>>>>>> Thanks for opening this discussion.
>> > > > > > > > > > > >>>>>>>>>>>
>> > > > > > > > > > > >>>>>>>>>>> This current proposal may work in the
>> > mentioned
>> > > > > > > watermark
>> > > > > > > > > > > >> cases.
>> > > > > > > > > > > >>>>>>>>>>> However, it seems this is not a general
>> > > solution
>> > > > > for
>> > > > > > > > > sources
>> > > > > > > > > > > >> to
>> > > > > > > > > > > >>>>>>>> determine
>> > > > > > > > > > > >>>>>>>>>>> "isProcessingBacklog".
>> > > > > > > > > > > >>>>>>>>>>> From my point of view, there are 3
>> > limitations
>> > > of
>> > > > > the
>> > > > > > > > > current
>> > > > > > > > > > > >>>>>> proposal:
>> > > > > > > > > > > >>>>>>>>>>> 1. It doesn't cover jobs that don't have
>> > > > > > > > > watermark/event-time
>> > > > > > > > > > > >>>>>> defined,
>> > > > > > > > > > > >>>>>>>>>>> for example streaming join and unbounded
>> > > > aggregate.
>> > > > > > We
>> > > > > > > > may
>> > > > > > > > > > > >> still
>> > > > > > > > > > > >>>> need
>> > > > > > > > > > > >>>>>>>> to
>> > > > > > > > > > > >>>>>>>>>>> figure out solutions for them.
>> > > > > > > > > > > >>>>>>>>>>> 2. Watermark lag can not be trusted,
>> because
>> > it
>> > > > > > > increases
>> > > > > > > > > > > >>>> unlimited
>> > > > > > > > > > > >>>>>> if
>> > > > > > > > > > > >>>>>>>> no
>> > > > > > > > > > > >>>>>>>>>>> data is generated in the Kafka.
>> > > > > > > > > > > >>>>>>>>>>> But in this case, there is no backlog at
>> all.
>> > > > > > > > > > > >>>>>>>>>>> 3. Watermark lag is hard to reflect the
>> > amount
>> > > of
>> > > > > > > > backlog.
>> > > > > > > > > > > >> If the
>> > > > > > > > > > > >>>>>>>>>> watermark
>> > > > > > > > > > > >>>>>>>>>>> lag is 1day or 1 hour or 1second,
>> > > > > > > > > > > >>>>>>>>>>> there is possibly only 1 pending record
>> > there,
>> > > > > which
>> > > > > > > > means
>> > > > > > > > > no
>> > > > > > > > > > > >>>> backlog
>> > > > > > > > > > > >>>>>>>> at
>> > > > > > > > > > > >>>>>>>>>>> all.
>> > > > > > > > > > > >>>>>>>>>>>
>> > > > > > > > > > > >>>>>>>>>>> Therefore, IMO, watermark maybe not the
>> ideal
>> > > > > metric
>> > > > > > > used
>> > > > > > > > > to
>> > > > > > > > > > > >>>>>> determine
>> > > > > > > > > > > >>>>>>>>>>> "isProcessingBacklog".
>> > > > > > > > > > > >>>>>>>>>>> What we need is something that reflects
>> the
>> > > > number
>> > > > > of
>> > > > > > > > > records
>> > > > > > > > > > > >>>>>>>> unprocessed
>> > > > > > > > > > > >>>>>>>>>>> by the job.
>> > > > > > > > > > > >>>>>>>>>>> Actually, that is the "pendingRecords"
>> metric
>> > > > > > proposed
>> > > > > > > in
>> > > > > > > > > > > >>>> FLIP-33 and
>> > > > > > > > > > > >>>>>>>> has
>> > > > > > > > > > > >>>>>>>>>>> been implemented by Kafka source.
>> > > > > > > > > > > >>>>>>>>>>> Did you consider using "pendingRecords"
>> > metric
>> > > to
>> > > > > > > > determine
>> > > > > > > > > > > >>>>>>>>>>> "isProcessingBacklog"?
>> > > > > > > > > > > >>>>>>>>>>>
>> > > > > > > > > > > >>>>>>>>>>> Best,
>> > > > > > > > > > > >>>>>>>>>>> Jark
>> > > > > > > > > > > >>>>>>>>>>>
>> > > > > > > > > > > >>>>>>>>>>>
>> > > > > > > > > > > >>>>>>>>>>> [1]
>> > > > > > > > > > > >>>>>>>>>>>
>> > > > > > > > > > > >>>>>>>>>>
>> > > > > > > > > > > >>>>>>>>
>> > > > > > > > > > > >>>>>>
>> > > > > > > > > > > >>>>
>> > > > > > > > > > > >>
>> > > > > > > > > >
>> > > > > > > > >
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics
>> > > > > > > > > > > >>>>>>>>>> <
>> > > > > > > > > > > >>>>>>>>>>
>> > > > > > > > > > > >>>>>>>>
>> > > > > > > > > > > >>>>>>
>> > > > > > > > > > > >>>>
>> > > > > > > > > > > >>
>> > > > > > > > > >
>> > > > > > > > >
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics
>> > > > > > > > > > > >>>>>>>>>>>
>> > > > > > > > > > > >>>>>>>>>>>
>> > > > > > > > > > > >>>>>>>>>>>
>> > > > > > > > > > > >>>>>>>>>>>
>> > > > > > > > > > > >>>>>>>>>>> On Tue, 15 Aug 2023 at 12:04, Xintong
>> Song <
>> > > > > > > > > > > >>>> tonysong...@gmail.com
>> > > > > > > > > > > >>>>>>>>>> <mailto:tonysong...@gmail.com>> wrote:
>> > > > > > > > > > > >>>>>>>>>>>
>> > > > > > > > > > > >>>>>>>>>>>> Sounds good to me.
>> > > > > > > > > > > >>>>>>>>>>>>
>> > > > > > > > > > > >>>>>>>>>>>> It is true that, if we are introducing
>> the
>> > > > > > generalized
>> > > > > > > > > > > >>>> watermark,
>> > > > > > > > > > > >>>>>>>> there
>> > > > > > > > > > > >>>>>>>>>>>> will be other watermark related concepts
>> /
>> > > > > > > > configurations
>> > > > > > > > > > > >> that
>> > > > > > > > > > > >>>> need
>> > > > > > > > > > > >>>>>> to
>> > > > > > > > > > > >>>>>>>>>> be
>> > > > > > > > > > > >>>>>>>>>>>> updated anyway.
>> > > > > > > > > > > >>>>>>>>>>>>
>> > > > > > > > > > > >>>>>>>>>>>>
>> > > > > > > > > > > >>>>>>>>>>>> Best,
>> > > > > > > > > > > >>>>>>>>>>>>
>> > > > > > > > > > > >>>>>>>>>>>> Xintong
>> > > > > > > > > > > >>>>>>>>>>>>
>> > > > > > > > > > > >>>>>>>>>>>>
>> > > > > > > > > > > >>>>>>>>>>>>
>> > > > > > > > > > > >>>>>>>>>>>> On Tue, Aug 15, 2023 at 11:30 AM Xuannan
>> Su
>> > <
>> > > > > > > > > > > >>>> suxuanna...@gmail.com
>> > > > > > > > > > > >>>>>>>>>> <mailto:suxuanna...@gmail.com>> wrote:
>> > > > > > > > > > > >>>>>>>>>>>>
>> > > > > > > > > > > >>>>>>>>>>>>> Hi Xingtong,
>> > > > > > > > > > > >>>>>>>>>>>>>
>> > > > > > > > > > > >>>>>>>>>>>>> Thank you for your suggestion.
>> > > > > > > > > > > >>>>>>>>>>>>>
>> > > > > > > > > > > >>>>>>>>>>>>> After considering the idea of using a
>> > general
>> > > > > > > > > configuration
>> > > > > > > > > > > >>>> key, I
>> > > > > > > > > > > >>>>>>>>>> think
>> > > > > > > > > > > >>>>>>>>>>>>> it may not be a good idea for the
>> reasons
>> > > > below.
>> > > > > > > > > > > >>>>>>>>>>>>>
>> > > > > > > > > > > >>>>>>>>>>>>> While I agree that using a more general
>> > > > > > configuration
>> > > > > > > > key
>> > > > > > > > > > > >>>> provides
>> > > > > > > > > > > >>>>>> us
>> > > > > > > > > > > >>>>>>>>>>>> with
>> > > > > > > > > > > >>>>>>>>>>>>> the flexibility to switch to other
>> > approaches
>> > > > to
>> > > > > > > > > calculate
>> > > > > > > > > > > >> the
>> > > > > > > > > > > >>>> lag
>> > > > > > > > > > > >>>>>> in
>> > > > > > > > > > > >>>>>>>>>> the
>> > > > > > > > > > > >>>>>>>>>>>>> future, the downside is that it may
>> cause
>> > > > > confusion
>> > > > > > > for
>> > > > > > > > > > > >> users.
>> > > > > > > > > > > >>>> We
>> > > > > > > > > > > >>>>>>>>>>>> currently
>> > > > > > > > > > > >>>>>>>>>>>>> have fetchEventTimeLag,
>> emitEventTimeLag,
>> > and
>> > > > > > > > > watermarkLag
>> > > > > > > > > > > >> in
>> > > > > > > > > > > >>>> the
>> > > > > > > > > > > >>>>>>>>>> source,
>> > > > > > > > > > > >>>>>>>>>>>>> and it is not clear which specific lag
>> we
>> > are
>> > > > > > > referring
>> > > > > > > > > to.
>> > > > > > > > > > > >>>> With
>> > > > > > > > > > > >>>>>> the
>> > > > > > > > > > > >>>>>>>>>>>>> potential introduction of the
>> Generalized
>> > > > > Watermark
>> > > > > > > > > > > >> mechanism
>> > > > > > > > > > > >>>> in
>> > > > > > > > > > > >>>>>> the
>> > > > > > > > > > > >>>>>>>>>>>>> future, if I understand correctly, a
>> > > watermark
>> > > > > > won't
>> > > > > > > > > > > >>>> necessarily
>> > > > > > > > > > > >>>>>> need
>> > > > > > > > > > > >>>>>>>>>> to
>> > > > > > > > > > > >>>>>>>>>>>> be
>> > > > > > > > > > > >>>>>>>>>>>>> a timestamp. I am concern that the
>> general
>> > > > > > > > configuration
>> > > > > > > > > > > >> key
>> > > > > > > > > > > >>>> may
>> > > > > > > > > > > >>>>>> not
>> > > > > > > > > > > >>>>>>>>>> be
>> > > > > > > > > > > >>>>>>>>>>>>> enough to cover all the use case and we
>> > will
>> > > > need
>> > > > > > to
>> > > > > > > > > > > >> introduce
>> > > > > > > > > > > >>>> a
>> > > > > > > > > > > >>>>>>>>>> general
>> > > > > > > > > > > >>>>>>>>>>>>> way to determine the backlog status
>> > > regardless.
>> > > > > > > > > > > >>>>>>>>>>>>>
>> > > > > > > > > > > >>>>>>>>>>>>> For the reasons above, I prefer
>> introducing
>> > > the
>> > > > > > > > > > > >> configuration
>> > > > > > > > > > > >>>> as
>> > > > > > > > > > > >>>>>> is,
>> > > > > > > > > > > >>>>>>>>>> and
>> > > > > > > > > > > >>>>>>>>>>>>> change it later with the a deprecation
>> > > process
>> > > > or
>> > > > > > > > > migration
>> > > > > > > > > > > >>>>>> process.
>> > > > > > > > > > > >>>>>>>>>> What
>> > > > > > > > > > > >>>>>>>>>>>>> do you think?
>> > > > > > > > > > > >>>>>>>>>>>>>
>> > > > > > > > > > > >>>>>>>>>>>>> Best,
>> > > > > > > > > > > >>>>>>>>>>>>> Xuannan
>> > > > > > > > > > > >>>>>>>>>>>>> On Aug 14, 2023, 14:09 +0800, Xintong
>> Song
>> > <
>> > > > > > > > > > > >>>> tonysong...@gmail.com
>> > > > > > > > > > > >>>>>>>>>> <mailto:tonysong...@gmail.com>>,
>> > > > > > > > > > > >>>>>>>>>>>> wrote:
>> > > > > > > > > > > >>>>>>>>>>>>>> Thanks for the explanation.
>> > > > > > > > > > > >>>>>>>>>>>>>>
>> > > > > > > > > > > >>>>>>>>>>>>>> I wonder if it makes sense to not
>> expose
>> > > this
>> > > > > > detail
>> > > > > > > > via
>> > > > > > > > > > > >> the
>> > > > > > > > > > > >>>>>>>>>>>>> configuration
>> > > > > > > > > > > >>>>>>>>>>>>>> option. To be specific, I suggest not
>> > > > mentioning
>> > > > > > the
>> > > > > > > > > > > >>>> "watermark"
>> > > > > > > > > > > >>>>>>>>>>>> keyword
>> > > > > > > > > > > >>>>>>>>>>>>> in
>> > > > > > > > > > > >>>>>>>>>>>>>> the configuration key and description.
>> > > > > > > > > > > >>>>>>>>>>>>>>
>> > > > > > > > > > > >>>>>>>>>>>>>> - From the users' perspective, I think
>> > they
>> > > > only
>> > > > > > > need
>> > > > > > > > to
>> > > > > > > > > > > >> know
>> > > > > > > > > > > >>>>>>>> there's
>> > > > > > > > > > > >>>>>>>>>> a
>> > > > > > > > > > > >>>>>>>>>>>>>> lag higher than the given threshold,
>> Flink
>> > > > will
>> > > > > > > > consider
>> > > > > > > > > > > >>>> latency
>> > > > > > > > > > > >>>>>> of
>> > > > > > > > > > > >>>>>>>>>>>>>> individual records as less important
>> and
>> > > > > > prioritize
>> > > > > > > > > > > >> throughput
>> > > > > > > > > > > >>>>>> over
>> > > > > > > > > > > >>>>>>>>>> it.
>> > > > > > > > > > > >>>>>>>>>>>>>> They don't really need the details of
>> how
>> > > the
>> > > > > lags
>> > > > > > > are
>> > > > > > > > > > > >>>> calculated.
>> > > > > > > > > > > >>>>>>>>>>>>>> - For the internal implementation, I
>> also
>> > > > think
>> > > > > > > using
>> > > > > > > > > > > >>>> watermark
>> > > > > > > > > > > >>>>>> lags
>> > > > > > > > > > > >>>>>>>>>> is
>> > > > > > > > > > > >>>>>>>>>>>>>> a good idea, for the reasons you've
>> > already
>> > > > > > > mentioned.
>> > > > > > > > > > > >>>> However,
>> > > > > > > > > > > >>>>>> it's
>> > > > > > > > > > > >>>>>>>>>>>> not
>> > > > > > > > > > > >>>>>>>>>>>>>> the only possible option. Hiding this
>> > detail
>> > > > > from
>> > > > > > > > users
>> > > > > > > > > > > >> would
>> > > > > > > > > > > >>>> give
>> > > > > > > > > > > >>>>>>>> us
>> > > > > > > > > > > >>>>>>>>>>>> the
>> > > > > > > > > > > >>>>>>>>>>>>>> flexibility to switch to other
>> approaches
>> > if
>> > > > > > needed
>> > > > > > > in
>> > > > > > > > > > > >> future.
>> > > > > > > > > > > >>>>>>>>>>>>>> - We are currently working on designing
>> > the
>> > > > > > > > > > > >> ProcessFunction
>> > > > > > > > > > > >>>> API
>> > > > > > > > > > > >>>>>>>>>>>>>> (consider it as a DataStream API V2).
>> > > There's
>> > > > an
>> > > > > > > idea
>> > > > > > > > to
>> > > > > > > > > > > >>>>>> introduce a
>> > > > > > > > > > > >>>>>>>>>>>>>> Generalized Watermark mechanism, where
>> > > > basically
>> > > > > > the
>> > > > > > > > > > > >>>> watermark can
>> > > > > > > > > > > >>>>>>>> be
>> > > > > > > > > > > >>>>>>>>>>>>>> anything that needs to travel along the
>> > > > > data-flow
>> > > > > > > with
>> > > > > > > > > > > >> certain
>> > > > > > > > > > > >>>>>>>>>>>> alignment
>> > > > > > > > > > > >>>>>>>>>>>>>> strategies, and event time watermark
>> would
>> > > be
>> > > > > one
>> > > > > > > > > specific
>> > > > > > > > > > > >>>> case of
>> > > > > > > > > > > >>>>>>>> it.
>> > > > > > > > > > > >>>>>>>>>>>>> This
>> > > > > > > > > > > >>>>>>>>>>>>>> is still an idea and has not been
>> > discussed
>> > > > and
>> > > > > > > agreed
>> > > > > > > > > on
>> > > > > > > > > > > >> by
>> > > > > > > > > > > >>>> the
>> > > > > > > > > > > >>>>>>>>>>>>> community,
>> > > > > > > > > > > >>>>>>>>>>>>>> and we are preparing a FLIP for it.
>> But if
>> > > we
>> > > > > are
>> > > > > > > > going
>> > > > > > > > > > > >> for
>> > > > > > > > > > > >>>> it,
>> > > > > > > > > > > >>>>>> the
>> > > > > > > > > > > >>>>>>>>>>>>> concept
>> > > > > > > > > > > >>>>>>>>>>>>>> "watermark-lag-threshold" could be
>> > > ambiguous.
>> > > > > > > > > > > >>>>>>>>>>>>>>
>> > > > > > > > > > > >>>>>>>>>>>>>> I do not intend to block the FLIP on
>> this.
>> > > I'd
>> > > > > > also
>> > > > > > > be
>> > > > > > > > > > > >> fine
>> > > > > > > > > > > >>>> with
>> > > > > > > > > > > >>>>>>>>>>>>>> introducing the configuration as is,
>> and
>> > > > > changing
>> > > > > > it
>> > > > > > > > > > > >> later, if
>> > > > > > > > > > > >>>>>>>> needed,
>> > > > > > > > > > > >>>>>>>>>>>>> with
>> > > > > > > > > > > >>>>>>>>>>>>>> a regular deprecation and migration
>> > process.
>> > > > > Just
>> > > > > > > > making
>> > > > > > > > > > > >> my
>> > > > > > > > > > > >>>>>>>>>>>> suggestions.
>> > > > > > > > > > > >>>>>>>>>>>>>>
>> > > > > > > > > > > >>>>>>>>>>>>>>
>> > > > > > > > > > > >>>>>>>>>>>>>> Best,
>> > > > > > > > > > > >>>>>>>>>>>>>>
>> > > > > > > > > > > >>>>>>>>>>>>>> Xintong
>> > > > > > > > > > > >>>>>>>>>>>>>>
>> > > > > > > > > > > >>>>>>>>>>>>>>
>> > > > > > > > > > > >>>>>>>>>>>>>>
>> > > > > > > > > > > >>>>>>>>>>>>>> On Mon, Aug 14, 2023 at 12:00 PM
>> Xuannan
>> > Su
>> > > <
>> > > > > > > > > > > >>>>>> suxuanna...@gmail.com
>> > > > > > > > > > > >>>>>>>>>> <mailto:suxuanna...@gmail.com>>
>> > > > > > > > > > > >>>>>>>>>>>>> wrote:
>> > > > > > > > > > > >>>>>>>>>>>>>>
>> > > > > > > > > > > >>>>>>>>>>>>>>> Hi Xintong,
>> > > > > > > > > > > >>>>>>>>>>>>>>>
>> > > > > > > > > > > >>>>>>>>>>>>>>> Thanks for the reply.
>> > > > > > > > > > > >>>>>>>>>>>>>>>
>> > > > > > > > > > > >>>>>>>>>>>>>>> I have considered using the timestamp
>> in
>> > > the
>> > > > > > > records
>> > > > > > > > to
>> > > > > > > > > > > >>>> determine
>> > > > > > > > > > > >>>>>>>> the
>> > > > > > > > > > > >>>>>>>>>>>>>>> backlog status, and decided to use
>> > > watermark
>> > > > at
>> > > > > > the
>> > > > > > > > > end.
>> > > > > > > > > > > >> By
>> > > > > > > > > > > >>>>>>>>>>>> definition,
>> > > > > > > > > > > >>>>>>>>>>>>>>> watermark is the time progress
>> indication
>> > > in
>> > > > > the
>> > > > > > > data
>> > > > > > > > > > > >>>> stream. It
>> > > > > > > > > > > >>>>>>>>>>>>> indicates
>> > > > > > > > > > > >>>>>>>>>>>>>>> the stream’s event time has
>> progressed to
>> > > > some
>> > > > > > > > specific
>> > > > > > > > > > > >>>> time. On
>> > > > > > > > > > > >>>>>>>> the
>> > > > > > > > > > > >>>>>>>>>>>>> other
>> > > > > > > > > > > >>>>>>>>>>>>>>> hand, timestamp in the records is
>> usually
>> > > > used
>> > > > > to
>> > > > > > > > > > > >> generate
>> > > > > > > > > > > >>>> the
>> > > > > > > > > > > >>>>>>>>>>>>> watermark.
>> > > > > > > > > > > >>>>>>>>>>>>>>> Therefore, it appears more appropriate
>> > and
>> > > > > > > intuitive
>> > > > > > > > to
>> > > > > > > > > > > >>>> calculate
>> > > > > > > > > > > >>>>>>>> the
>> > > > > > > > > > > >>>>>>>>>>>>> event
>> > > > > > > > > > > >>>>>>>>>>>>>>> time lag by watermark and determine
>> the
>> > > > backlog
>> > > > > > > > status.
>> > > > > > > > > > > >> And
>> > > > > > > > > > > >>>> by
>> > > > > > > > > > > >>>>>>>> using
>> > > > > > > > > > > >>>>>>>>>>>>> the
>> > > > > > > > > > > >>>>>>>>>>>>>>> watermark, we can easily deal with the
>> > > > > > out-of-order
>> > > > > > > > and
>> > > > > > > > > > > >> the
>> > > > > > > > > > > >>>>>>>> idleness
>> > > > > > > > > > > >>>>>>>>>>>>> of the
>> > > > > > > > > > > >>>>>>>>>>>>>>> data.
>> > > > > > > > > > > >>>>>>>>>>>>>>>
>> > > > > > > > > > > >>>>>>>>>>>>>>> Please let me know if you have further
>> > > > > questions.
>> > > > > > > > > > > >>>>>>>>>>>>>>>
>> > > > > > > > > > > >>>>>>>>>>>>>>> Best,
>> > > > > > > > > > > >>>>>>>>>>>>>>> Xuannan
>> > > > > > > > > > > >>>>>>>>>>>>>>> On Aug 10, 2023, 20:23 +0800, Xintong
>> > Song
>> > > <
>> > > > > > > > > > > >>>>>> tonysong...@gmail.com
>> > > > > > > > > > > >>>>>>>>>> <mailto:tonysong...@gmail.com>>,
>> > > > > > > > > > > >>>>>>>>>>>>> wrote:
>> > > > > > > > > > > >>>>>>>>>>>>>>>> Thanks for preparing the FLIP,
>> Xuannan.
>> > > > > > > > > > > >>>>>>>>>>>>>>>>
>> > > > > > > > > > > >>>>>>>>>>>>>>>> +1 in general.
>> > > > > > > > > > > >>>>>>>>>>>>>>>>
>> > > > > > > > > > > >>>>>>>>>>>>>>>> A quick question, could you explain
>> why
>> > we
>> > > > are
>> > > > > > > > relying
>> > > > > > > > > > > >> on
>> > > > > > > > > > > >>>> the
>> > > > > > > > > > > >>>>>>>>>>>>> watermark
>> > > > > > > > > > > >>>>>>>>>>>>>>> for
>> > > > > > > > > > > >>>>>>>>>>>>>>>> emitting the record attribute? Why
>> not
>> > use
>> > > > > > > > timestamps
>> > > > > > > > > > > >> in the
>> > > > > > > > > > > >>>>>>>>>>>>> records? I
>> > > > > > > > > > > >>>>>>>>>>>>>>>> don't see any concern in using
>> > watermarks.
>> > > > > Just
>> > > > > > > > > > > >> wondering if
>> > > > > > > > > > > >>>>>>>>>>>> there's
>> > > > > > > > > > > >>>>>>>>>>>>> any
>> > > > > > > > > > > >>>>>>>>>>>>>>>> deep considerations behind this.
>> > > > > > > > > > > >>>>>>>>>>>>>>>>
>> > > > > > > > > > > >>>>>>>>>>>>>>>> Best,
>> > > > > > > > > > > >>>>>>>>>>>>>>>>
>> > > > > > > > > > > >>>>>>>>>>>>>>>> Xintong
>> > > > > > > > > > > >>>>>>>>>>>>>>>>
>> > > > > > > > > > > >>>>>>>>>>>>>>>>
>> > > > > > > > > > > >>>>>>>>>>>>>>>>
>> > > > > > > > > > > >>>>>>>>>>>>>>>> On Thu, Aug 3, 2023 at 3:03 PM
>> Xuannan
>> > Su
>> > > <
>> > > > > > > > > > > >>>>>> suxuanna...@gmail.com
>> > > > > > > > > > > >>>>>>>>>> <mailto:suxuanna...@gmail.com>>
>> > > > > > > > > > > >>>>>>>>>>>>> wrote:
>> > > > > > > > > > > >>>>>>>>>>>>>>>>
>> > > > > > > > > > > >>>>>>>>>>>>>>>>> Hi all,
>> > > > > > > > > > > >>>>>>>>>>>>>>>>>
>> > > > > > > > > > > >>>>>>>>>>>>>>>>> I am opening this thread to discuss
>> > > > FLIP-328:
>> > > > > > > Allow
>> > > > > > > > > > > >> source
>> > > > > > > > > > > >>>>>>>>>>>>> operators to
>> > > > > > > > > > > >>>>>>>>>>>>>>>>> determine isProcessingBacklog based
>> on
>> > > > > > watermark
>> > > > > > > > > > > >> lag[1]. We
>> > > > > > > > > > > >>>>>> had a
>> > > > > > > > > > > >>>>>>>>>>>>>>> several
>> > > > > > > > > > > >>>>>>>>>>>>>>>>> discussions with Dong Ling about the
>> > > > design,
>> > > > > > and
>> > > > > > > > > thanks
>> > > > > > > > > > > >>>> for all
>> > > > > > > > > > > >>>>>>>>>>>> the
>> > > > > > > > > > > >>>>>>>>>>>>>>>>> valuable advice.
>> > > > > > > > > > > >>>>>>>>>>>>>>>>>
>> > > > > > > > > > > >>>>>>>>>>>>>>>>> The FLIP aims to target the use-case
>> > > where
>> > > > > user
>> > > > > > > > want
>> > > > > > > > > to
>> > > > > > > > > > > >>>> run a
>> > > > > > > > > > > >>>>>>>>>>>> Flink
>> > > > > > > > > > > >>>>>>>>>>>>>>> job to
>> > > > > > > > > > > >>>>>>>>>>>>>>>>> backfill historical data in a high
>> > > > throughput
>> > > > > > > > manner
>> > > > > > > > > > > >> and
>> > > > > > > > > > > >>>>>> continue
>> > > > > > > > > > > >>>>>>>>>>>>>>>>> processing real-time data with low
>> > > latency.
>> > > > > > > > Building
>> > > > > > > > > > > >> upon
>> > > > > > > > > > > >>>> the
>> > > > > > > > > > > >>>>>>>>>>>>> backlog
>> > > > > > > > > > > >>>>>>>>>>>>>>>>> concept introduced in FLIP-309[2],
>> this
>> > > > > > proposal
>> > > > > > > > > > > >> enables
>> > > > > > > > > > > >>>>>> sources
>> > > > > > > > > > > >>>>>>>>>>>> to
>> > > > > > > > > > > >>>>>>>>>>>>>>> report
>> > > > > > > > > > > >>>>>>>>>>>>>>>>> their status of processing backlog
>> > based
>> > > on
>> > > > > the
>> > > > > > > > > > > >> watermark
>> > > > > > > > > > > >>>> lag.
>> > > > > > > > > > > >>>>>>>>>>>>>>>>>
>> > > > > > > > > > > >>>>>>>>>>>>>>>>> We would greatly appreciate any
>> > comments
>> > > or
>> > > > > > > > feedback
>> > > > > > > > > > > >> you
>> > > > > > > > > > > >>>> may
>> > > > > > > > > > > >>>>>> have
>> > > > > > > > > > > >>>>>>>>>>>>> on
>> > > > > > > > > > > >>>>>>>>>>>>>>> this
>> > > > > > > > > > > >>>>>>>>>>>>>>>>> proposal.
>> > > > > > > > > > > >>>>>>>>>>>>>>>>>
>> > > > > > > > > > > >>>>>>>>>>>>>>>>> Best,
>> > > > > > > > > > > >>>>>>>>>>>>>>>>> Xuannan
>> > > > > > > > > > > >>>>>>>>>>>>>>>>>
>> > > > > > > > > > > >>>>>>>>>>>>>>>>>
>> > > > > > > > > > > >>>>>>>>>>>>>>>>> [1]
>> > > > > > > > > > > >>>>>>>>>>>>>>>>>
>> > > > > > > > > > > >>>>>>>>>>>>>>>
>> > > > > > > > > > > >>>>>>>>>>>>>
>> > > > > > > > > > > >>>>>>>>>>>>
>> > > > > > > > > > > >>>>>>>>>>
>> > > > > > > > > > > >>>>>>>>
>> > > > > > > > > > > >>>>>>
>> > > > > > > > > > > >>>>
>> > > > > > > > > > > >>
>> > > > > > > > > >
>> > > > > > > > >
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-328%3A+Allow+source+operators+to+determine+isProcessingBacklog+based+on+watermark+lag
>> > > > > > > > > > > >>>>>>>>>> <
>> > > > > > > > > > > >>>>>>>>>>
>> > > > > > > > > > > >>>>>>>>
>> > > > > > > > > > > >>>>>>
>> > > > > > > > > > > >>>>
>> > > > > > > > > > > >>
>> > > > > > > > > >
>> > > > > > > > >
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-328%3A+Allow+source+operators+to+determine+isProcessingBacklog+based+on+watermark+lag
>> > > > > > > > > > > >>>>>>>>>>>
>> > > > > > > > > > > >>>>>>>>>>>>>>>>> [2]
>> > > > > > > > > > > >>>>>>>>>>>>>>>>>
>> > > > > > > > > > > >>>>>>>>>>>>>>>
>> > > > > > > > > > > >>>>>>>>>>>>>
>> > > > > > > > > > > >>>>>>>>>>>>
>> > > > > > > > > > > >>>>>>>>>>
>> > > > > > > > > > > >>>>>>>>
>> > > > > > > > > > > >>>>>>
>> > > > > > > > > > > >>>>
>> > > > > > > > > > > >>
>> > > > > > > > > >
>> > > > > > > > >
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-309%3A+Support+using+larger+checkpointing+interval+when+source+is+processing+backlog
>> > > > > > > > > > > >>>>>>>>>> <
>> > > > > > > > > > > >>>>>>>>>>
>> > > > > > > > > > > >>>>>>>>
>> > > > > > > > > > > >>>>>>
>> > > > > > > > > > > >>>>
>> > > > > > > > > > > >>
>> > > > > > > > > >
>> > > > > > > > >
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-309%3A+Support+using+larger+checkpointing+interval+when+source+is+processing+backlog
>> > > > > > > > > > > >>>>>>>>
>> > > > > > > > > > > >>>>>>>>
>> > > > > > > > > > > >>>>>>>>
>> > > > > > > > > > > >>>>>>
>> > > > > > > > > > > >>>>>>
>> > > > > > > > > > > >>>>
>> > > > > > > > > > > >>
>> > > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > >
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>>
>

Reply via email to