Re: [DISCUSS] FLIP-328: Allow source operators to determine isProcessingBacklog based on watermark lag

2023-09-25 Thread Dong Lin
Hi Piotr,

Thanks for the comments. Let me try to explain it below.

Overall, the two competing options differ in how an invocation of
`#setIsProcessingBacklog(false)` affects the backlog status for the given
source (corresponding to the SplitEnumeratorContext instance on which this
method is invoked).

- With my approach, setIsProcessingBacklog(false) merely unsets effects of
any previous invocation of setIsProcessingBacklog(..) on the given source,
without necessarily forcing the source's backlog status to be false.
- With Jark’s approach, setIsProcessingBacklog(false) forces the source's
backlog status to be false.

There is no practical difference between these two options as of FLIP-309.
However, once we introduce additional strategy (e.g. job-level config) to
configure backlog status in FLIP-328, there will be tricky and important
differences between them.

More specifically, let’s say we want to introduce a job-level config such
as “”pipeline.backlog.watermark-lag-threshold” as mentioned in FLIP-328:

- With Jack’s approach, if MySQL CDC invokes setIsProcessingBacklog(false)
at the beginning of the “unbounded phase”, then that effectively means
isProcessingBacklog=false even if watermark lag exceeds the configured
threshold, preventing job-level config from taking effect during the
"unbounded phase".
- With my approach, even if MySQL CDC invokes setIsProcessingBacklog(false)
at the beginning of the “unbounded phase”, the source can still have
isProcessingBacklog=true when watermark lag is too high.

Would this clarify the difference between these two options?

Regards,
Dong


On Mon, Sep 25, 2023 at 5:15 PM Piotr Nowojski 
wrote:

> Hi Jarl and Dong,
>
> I'm a bit confused about the difference between the two competing options.
> Could one of you elaborate what's the difference between:
> > 2) The semantics of `#setIsProcessingBacklog(false)` is that it overrides
> > the effect of the previous invocation (if any) of
> > `#setIsProcessingBacklog(true)` on the given source instance.
>
> and
>
> > 2) The semantics of `#setIsProcessingBacklog(false)` is that the given
> > source instance will have watermarkLag=false.
>
> ?
>
> Best,
> Piotrek
>
> czw., 21 wrz 2023 o 15:28 Dong Lin  napisał(a):
>
> > Hi all,
> >
> > Jark and I discussed this FLIP offline and I will summarize our
> discussion
> > below. It would be great if you could provide your opinion of the
> proposed
> > options.
> >
> > Regarding the target use-cases:
> > - We both agreed that MySQL CDC should have backlog=true when
> watermarkLag
> > is large during the binlog phase.
> > - Dong argued that other streaming sources with watermarkLag defined
> (e.g.
> > Kafka) should also have backlog=true when watermarkLag is large. The
> > pros/cons discussion below assume this use-case needs to be supported.
> >
> > The 1st option is what is currently proposed in FLIP-328, with the
> > following key characteristics:
> > 1) There is one job-level config (i.e.
> > pipeline.backlog.watermark-lag-threshold) that applies to all sources
> with
> > watermarkLag metric defined.
> > 2) The semantics of `#setIsProcessingBacklog(false)` is that it overrides
> > the effect of the previous invocation (if any) of
> > `#setIsProcessingBacklog(true)` on the given source instance.
> >
> > The 2nd option is what Jark proposed in this email thread, with the
> > following key characteristics:
> > 1) Add source-specific config (both Java API and SQL source property) to
> > every source for which we want to set backlog status based on the
> > watermarkLag metric. For example, we might add separate Java APIs
> > `#setWatermarkLagThreshold`  for MySQL CDC source, HybridSource,
> > KafkaSource, PulsarSource etc.
> > 2) The semantics of `#setIsProcessingBacklog(false)` is that the given
> > source instance will have watermarkLag=false.
> >
> > Here are the key pros/cons of these two options.
> >
> > Cons of the 1st option:
> > 1) The semantics of `#setIsProcessingBacklog(false)` is harder to
> > understand for Flink operator developers than the corresponding semantics
> > in option-2.
> >
> > Cons of the  2nd option:
> > 1) More work for end-users. For a job with multiple sources that need to
> be
> > configured with a watermark lag threshold, users need to specify multiple
> > configs (one for each source) instead of specifying one job-level config.
> >
> > 2) More work for Flink operator developers. Overall there are more public
> > APIs (one Java API and one SQL property for each source that needs to
> > determine backlog based on watermark) exposed to end users. This also
> adds
> > more burden for the Flink community to maintain these APIs.
> >
> > 3) It would be hard (e.g. require backward incompatible API change) to
> > extend the Flink runtime to support job-level config to set watermark
> > strategy in the future (e.g. support the
> > pipeline.backlog.watermark-lag-threshold in option-1). This is because an
> > existing source operator's code might have hardcoded 

Re: [DISCUSS] FLIP-328: Allow source operators to determine isProcessingBacklog based on watermark lag

2023-09-25 Thread Piotr Nowojski
Hi Jarl and Dong,

I'm a bit confused about the difference between the two competing options.
Could one of you elaborate what's the difference between:
> 2) The semantics of `#setIsProcessingBacklog(false)` is that it overrides
> the effect of the previous invocation (if any) of
> `#setIsProcessingBacklog(true)` on the given source instance.

and

> 2) The semantics of `#setIsProcessingBacklog(false)` is that the given
> source instance will have watermarkLag=false.

?

Best,
Piotrek

czw., 21 wrz 2023 o 15:28 Dong Lin  napisał(a):

> Hi all,
>
> Jark and I discussed this FLIP offline and I will summarize our discussion
> below. It would be great if you could provide your opinion of the proposed
> options.
>
> Regarding the target use-cases:
> - We both agreed that MySQL CDC should have backlog=true when watermarkLag
> is large during the binlog phase.
> - Dong argued that other streaming sources with watermarkLag defined (e.g.
> Kafka) should also have backlog=true when watermarkLag is large. The
> pros/cons discussion below assume this use-case needs to be supported.
>
> The 1st option is what is currently proposed in FLIP-328, with the
> following key characteristics:
> 1) There is one job-level config (i.e.
> pipeline.backlog.watermark-lag-threshold) that applies to all sources with
> watermarkLag metric defined.
> 2) The semantics of `#setIsProcessingBacklog(false)` is that it overrides
> the effect of the previous invocation (if any) of
> `#setIsProcessingBacklog(true)` on the given source instance.
>
> The 2nd option is what Jark proposed in this email thread, with the
> following key characteristics:
> 1) Add source-specific config (both Java API and SQL source property) to
> every source for which we want to set backlog status based on the
> watermarkLag metric. For example, we might add separate Java APIs
> `#setWatermarkLagThreshold`  for MySQL CDC source, HybridSource,
> KafkaSource, PulsarSource etc.
> 2) The semantics of `#setIsProcessingBacklog(false)` is that the given
> source instance will have watermarkLag=false.
>
> Here are the key pros/cons of these two options.
>
> Cons of the 1st option:
> 1) The semantics of `#setIsProcessingBacklog(false)` is harder to
> understand for Flink operator developers than the corresponding semantics
> in option-2.
>
> Cons of the  2nd option:
> 1) More work for end-users. For a job with multiple sources that need to be
> configured with a watermark lag threshold, users need to specify multiple
> configs (one for each source) instead of specifying one job-level config.
>
> 2) More work for Flink operator developers. Overall there are more public
> APIs (one Java API and one SQL property for each source that needs to
> determine backlog based on watermark) exposed to end users. This also adds
> more burden for the Flink community to maintain these APIs.
>
> 3) It would be hard (e.g. require backward incompatible API change) to
> extend the Flink runtime to support job-level config to set watermark
> strategy in the future (e.g. support the
> pipeline.backlog.watermark-lag-threshold in option-1). This is because an
> existing source operator's code might have hardcoded an invocation of
> `#setIsProcessingBacklog(false)`, which means the backlog status must be
> set to true, which prevents Flink runtime from setting backlog=true when a
> new strategy is triggered.
>
> Overall, I am still inclined to choose option-1 because it is more
> extensible and simpler to use in the long term when we want to support/use
> multiple sources whose backlog status can change based on the watermark
> lag. While option-1's `#setIsProcessingBacklog` is a bit harder to
> understand than option-2, I think this overhead/cost is worthwhile as it
> makes end-users' life easier in the long term.
>
> Jark: thank you for taking the time to review this FLIP. Please feel free
> to comment if I missed anything in the pros/cons above.
>
> Jark and I have not reached agreement on which option is better. It will be
> really helpful if we can get more comments on these options.
>
> Thanks,
> Dong
>
>
> On Tue, Sep 19, 2023 at 11:26 AM Dong Lin  wrote:
>
> > Hi Jark,
> >
> > Thanks for the reply. Please see my comments inline.
> >
> > On Tue, Sep 19, 2023 at 10:12 AM Jark Wu  wrote:
> >
> >> Hi Dong,
> >>
> >> Sorry for the late reply.
> >>
> >> > The rationale is that if there is any strategy that is triggered and
> >> says
> >> > backlog=true, then job's backlog should be true. Otherwise, the job's
> >> > backlog status is false.
> >>
> >> I'm quite confused about this. Does that mean, if the source is in the
> >> changelog phase, the source has to continuously invoke
> >> "setIsProcessingBacklog(true)" (in an infinite loop?). Otherwise,
> >> the job's backlog status would be set to false by the framework?
> >>
> >
> > No, the source would not have to continuously invoke
> > setIsProcessingBacklog(true) in an infinite loop.
> >
> > Actually, I am not very sure why there is confusion that 

Re: [DISCUSS] FLIP-328: Allow source operators to determine isProcessingBacklog based on watermark lag

2023-09-21 Thread Dong Lin
Hi all,

Jark and I discussed this FLIP offline and I will summarize our discussion
below. It would be great if you could provide your opinion of the proposed
options.

Regarding the target use-cases:
- We both agreed that MySQL CDC should have backlog=true when watermarkLag
is large during the binlog phase.
- Dong argued that other streaming sources with watermarkLag defined (e.g.
Kafka) should also have backlog=true when watermarkLag is large. The
pros/cons discussion below assume this use-case needs to be supported.

The 1st option is what is currently proposed in FLIP-328, with the
following key characteristics:
1) There is one job-level config (i.e.
pipeline.backlog.watermark-lag-threshold) that applies to all sources with
watermarkLag metric defined.
2) The semantics of `#setIsProcessingBacklog(false)` is that it overrides
the effect of the previous invocation (if any) of
`#setIsProcessingBacklog(true)` on the given source instance.

The 2nd option is what Jark proposed in this email thread, with the
following key characteristics:
1) Add source-specific config (both Java API and SQL source property) to
every source for which we want to set backlog status based on the
watermarkLag metric. For example, we might add separate Java APIs
`#setWatermarkLagThreshold`  for MySQL CDC source, HybridSource,
KafkaSource, PulsarSource etc.
2) The semantics of `#setIsProcessingBacklog(false)` is that the given
source instance will have watermarkLag=false.

Here are the key pros/cons of these two options.

Cons of the 1st option:
1) The semantics of `#setIsProcessingBacklog(false)` is harder to
understand for Flink operator developers than the corresponding semantics
in option-2.

Cons of the  2nd option:
1) More work for end-users. For a job with multiple sources that need to be
configured with a watermark lag threshold, users need to specify multiple
configs (one for each source) instead of specifying one job-level config.

2) More work for Flink operator developers. Overall there are more public
APIs (one Java API and one SQL property for each source that needs to
determine backlog based on watermark) exposed to end users. This also adds
more burden for the Flink community to maintain these APIs.

3) It would be hard (e.g. require backward incompatible API change) to
extend the Flink runtime to support job-level config to set watermark
strategy in the future (e.g. support the
pipeline.backlog.watermark-lag-threshold in option-1). This is because an
existing source operator's code might have hardcoded an invocation of
`#setIsProcessingBacklog(false)`, which means the backlog status must be
set to true, which prevents Flink runtime from setting backlog=true when a
new strategy is triggered.

Overall, I am still inclined to choose option-1 because it is more
extensible and simpler to use in the long term when we want to support/use
multiple sources whose backlog status can change based on the watermark
lag. While option-1's `#setIsProcessingBacklog` is a bit harder to
understand than option-2, I think this overhead/cost is worthwhile as it
makes end-users' life easier in the long term.

Jark: thank you for taking the time to review this FLIP. Please feel free
to comment if I missed anything in the pros/cons above.

Jark and I have not reached agreement on which option is better. It will be
really helpful if we can get more comments on these options.

Thanks,
Dong


On Tue, Sep 19, 2023 at 11:26 AM Dong Lin  wrote:

> Hi Jark,
>
> Thanks for the reply. Please see my comments inline.
>
> On Tue, Sep 19, 2023 at 10:12 AM Jark Wu  wrote:
>
>> Hi Dong,
>>
>> Sorry for the late reply.
>>
>> > The rationale is that if there is any strategy that is triggered and
>> says
>> > backlog=true, then job's backlog should be true. Otherwise, the job's
>> > backlog status is false.
>>
>> I'm quite confused about this. Does that mean, if the source is in the
>> changelog phase, the source has to continuously invoke
>> "setIsProcessingBacklog(true)" (in an infinite loop?). Otherwise,
>> the job's backlog status would be set to false by the framework?
>>
>
> No, the source would not have to continuously invoke
> setIsProcessingBacklog(true) in an infinite loop.
>
> Actually, I am not very sure why there is confusion that "the job's
> backlog status would be set to false by the framework". Could you explain
> where that comes from?
>
> I guess it might be useful to provide a complete overview of how
> setIsProcessingBacklog(...)
> and pipline.backlog.watermark-lag-threshold work together to determine the
> overall job's backlog status. Let me explain it below.
>
> Here is the semantics/behavior of setIsProcessingBacklog(..).
> - This method is invoked on a per-source basis.
> - This method can be invoked multiple times with true/false as its input
> parameter.
> - For a given source, the last invocation of this method overwrites the
> effect of earlier invocation of this method.
> - For a given source, if this method has been invoked 

Re: [DISCUSS] FLIP-328: Allow source operators to determine isProcessingBacklog based on watermark lag

2023-09-18 Thread Dong Lin
Hi Jark,

Thanks for the reply. Please see my comments inline.

On Tue, Sep 19, 2023 at 10:12 AM Jark Wu  wrote:

> Hi Dong,
>
> Sorry for the late reply.
>
> > The rationale is that if there is any strategy that is triggered and says
> > backlog=true, then job's backlog should be true. Otherwise, the job's
> > backlog status is false.
>
> I'm quite confused about this. Does that mean, if the source is in the
> changelog phase, the source has to continuously invoke
> "setIsProcessingBacklog(true)" (in an infinite loop?). Otherwise,
> the job's backlog status would be set to false by the framework?
>

No, the source would not have to continuously invoke
setIsProcessingBacklog(true) in an infinite loop.

Actually, I am not very sure why there is confusion that "the job's backlog
status would be set to false by the framework". Could you explain where
that comes from?

I guess it might be useful to provide a complete overview of how
setIsProcessingBacklog(...)
and pipline.backlog.watermark-lag-threshold work together to determine the
overall job's backlog status. Let me explain it below.

Here is the semantics/behavior of setIsProcessingBacklog(..).
- This method is invoked on a per-source basis.
- This method can be invoked multiple times with true/false as its input
parameter.
- For a given source, the last invocation of this method overwrites the
effect of earlier invocation of this method.
- For a given source, if this method has been invoked at least once and the
last invocation of this method has isProcessingBacklog = true, then it is
guaranteed that the backlog status of this source is set to true.

Here is the semantics/behavior of pipline.backlog.watermark-lag-threshold:
- This config is specified at the job level and applies to every source
included in this job.
- For a given source, if it's watermarkLag metric is available (see
FLIP-33) and watermarkLag > watermark-lag-threshold, then it is guaranteed
that backlog status of this source is set to true.

Here is how the source's backlog status is determined: If a rule specified
above says the source's backog status should be true, then the source's
backlog status is set to true. Otherwise, it is set to false.

How is how the job's backlog status is determined: If there exists a source
that is currently running and the source's backlog status is set to true,
then the job's backlog status is set to true. Otherwise, it is set to false.

Hopefully this can help clarify the behavior. If needed, we can update the
relevant doc (e.g. setIsProcessingBacklog() Java doc) to make semantics
above clearer for Flink users.

And I would be happy to discuss/explain this design offline when you have
time.

Thanks,
Dong


>
> Best,
> Jark
>
> On Tue, 19 Sept 2023 at 09:13, Dong Lin  wrote:
>
> > Hi Jark,
> >
> > Do you have time to comment on whether the current design looks good?
> >
> > I plan to start voting in 3 days if there is no follow-up comment.
> >
> > Thanks,
> > Dong
> >
> >
> > On Fri, Sep 15, 2023 at 2:01 PM Jark Wu  wrote:
> >
> > > Hi Dong,
> > >
> > > > Note that we can not simply enforce the semantics of "any invocation
> of
> > > > setIsProcessingBacklog(false) will set the job's backlog status to
> > > false".
> > > > Suppose we have a job with two operators, where operatorA invokes
> > > > setIsProcessingBacklog(false) and operatorB invokes
> > > > setIsProcessingBacklog(true). There will be conflict if we use the
> > > > semantics of "any invocation of setIsProcessingBacklog(false) will
> set
> > > the
> > > > job's backlog status to false".
> > >
> > > So it should set job's backlog status to false if the job has only a
> > single
> > > source,
> > > right?
> > >
> > > Could you elaborate on the behavior if there is a job with a single
> > source,
> > > and the watermark lag exceeds the configured value (should set backlog
> to
> > > true?),
> > > but the source invokes "setIsProcessingBacklog(false)"? Or the inverse
> > one,
> > > the source invokes "setIsProcessingBacklog(false)" first, but the
> > watermark
> > > lag
> > > exceeds the configured value.
> > >
> > > This is the conflict I'm concerned about.
> > >
> > > Best,
> > > Jark
> > >
> > > On Fri, 15 Sept 2023 at 12:00, Dong Lin  wrote:
> > >
> > > > Hi Jark,
> > > >
> > > > Please see my comments inline.
> > > >
> > > > On Fri, Sep 15, 2023 at 10:35 AM Jark Wu  wrote:
> > > >
> > > > > Hi Dong,
> > > > >
> > > > > Please see my comments inline below.
> > > >
> > > >
> > > > > > Hmm.. can you explain what you mean by "different watermark delay
> > > > > > definitions for each source"?
> > > > >
> > > > > For example, "table1" defines a watermark with delay 5 seconds,
> > > > > "table2" defines a watermark with delay 10 seconds. They have
> > different
> > > > > watermark delay definitions. So it is also reasonable they have
> > > different
> > > > > watermark lag definitions, e.g., "table1" allows "10mins" and
> > "table2"
> > > > > allows "20mins".
> > > > >
> > > >
> > > > I think the 

Re: [DISCUSS] FLIP-328: Allow source operators to determine isProcessingBacklog based on watermark lag

2023-09-18 Thread Jark Wu
Hi Dong,

Sorry for the late reply.

> The rationale is that if there is any strategy that is triggered and says
> backlog=true, then job's backlog should be true. Otherwise, the job's
> backlog status is false.

I'm quite confused about this. Does that mean, if the source is in the
changelog phase, the source has to continuously invoke
"setIsProcessingBacklog(true)" (in an infinite loop?). Otherwise,
the job's backlog status would be set to false by the framework?

Best,
Jark

On Tue, 19 Sept 2023 at 09:13, Dong Lin  wrote:

> Hi Jark,
>
> Do you have time to comment on whether the current design looks good?
>
> I plan to start voting in 3 days if there is no follow-up comment.
>
> Thanks,
> Dong
>
>
> On Fri, Sep 15, 2023 at 2:01 PM Jark Wu  wrote:
>
> > Hi Dong,
> >
> > > Note that we can not simply enforce the semantics of "any invocation of
> > > setIsProcessingBacklog(false) will set the job's backlog status to
> > false".
> > > Suppose we have a job with two operators, where operatorA invokes
> > > setIsProcessingBacklog(false) and operatorB invokes
> > > setIsProcessingBacklog(true). There will be conflict if we use the
> > > semantics of "any invocation of setIsProcessingBacklog(false) will set
> > the
> > > job's backlog status to false".
> >
> > So it should set job's backlog status to false if the job has only a
> single
> > source,
> > right?
> >
> > Could you elaborate on the behavior if there is a job with a single
> source,
> > and the watermark lag exceeds the configured value (should set backlog to
> > true?),
> > but the source invokes "setIsProcessingBacklog(false)"? Or the inverse
> one,
> > the source invokes "setIsProcessingBacklog(false)" first, but the
> watermark
> > lag
> > exceeds the configured value.
> >
> > This is the conflict I'm concerned about.
> >
> > Best,
> > Jark
> >
> > On Fri, 15 Sept 2023 at 12:00, Dong Lin  wrote:
> >
> > > Hi Jark,
> > >
> > > Please see my comments inline.
> > >
> > > On Fri, Sep 15, 2023 at 10:35 AM Jark Wu  wrote:
> > >
> > > > Hi Dong,
> > > >
> > > > Please see my comments inline below.
> > >
> > >
> > > > > Hmm.. can you explain what you mean by "different watermark delay
> > > > > definitions for each source"?
> > > >
> > > > For example, "table1" defines a watermark with delay 5 seconds,
> > > > "table2" defines a watermark with delay 10 seconds. They have
> different
> > > > watermark delay definitions. So it is also reasonable they have
> > different
> > > > watermark lag definitions, e.g., "table1" allows "10mins" and
> "table2"
> > > > allows "20mins".
> > > >
> > >
> > > I think the watermark delay you mentioned above is conceptually /
> > > fundamentally different from the watermark-lag-threshold proposed in
> this
> > > FLIP.
> > >
> > > It might be useful to revisit the semantics of these two concepts:
> > > - watermark delay is used to account for the maximum amount of
> > orderliness
> > > that users expect (or willing to wait for) for records from a given
> > source.
> > > - watermark-lag-threshold is used to define when processing latency is
> no
> > > longer important (e.g. because data is already stale).
> > >
> > > Even though users might expect different out of orderliness for
> different
> > > sources, users do not necessarily have different definitions /
> thresholds
> > > for when a record is considered "already stale".
> > >
> > >
> > > >
> > > > > I think there is probably misunderstanding here. FLIP-309 does NOT
> > > > directly
> > > > > specify when backlog is false. It is intentionally specified in
> such
> > a
> > > > way
> > > > > that there will  not be any conflict between these rules.
> > > >
> > > > Do you mean FLIP-309 doesn't allow to specify backlog to be false?
> > > > Is this mentioned in FLIP-309? This is completely different from
> what I
> > > >
> > >
> > > Can you explain what you mean by "allow to specify backlog to be
> false"?
> > >
> > > If what you mean is that "can invoke setIsProcessingBacklog(false)",
> then
> > > FLIP-309 supports doing this.
> > >
> > > If what you mean is that "any invocation of
> setIsProcessingBacklog(false)
> > > will set the job's backlog status to false", then FLIP-309 does not
> > support
> > > this. I believe the existing Java doc of this API and FLIP-309 is
> > > compatible with this explanation.
> > >
> > > Note that we can not simply enforce the semantics of "any invocation of
> > > setIsProcessingBacklog(false) will set the job's backlog status to
> > false".
> > > Suppose we have a job with two operators, where operatorA invokes
> > > setIsProcessingBacklog(false) and operatorB invokes
> > > setIsProcessingBacklog(true). There will be conflict if we use the
> > > semantics of "any invocation of setIsProcessingBacklog(false) will set
> > the
> > > job's backlog status to false".
> > >
> > > Would this answer your question?
> > >
> > > Best,
> > > Dong
> > >
> > >
> > > > understand. From the API interface
> > "ctx.setIsProcessingBacklog(boolean)",
> > > > it 

Re: [DISCUSS] FLIP-328: Allow source operators to determine isProcessingBacklog based on watermark lag

2023-09-18 Thread Dong Lin
Hi Jark,

Do you have time to comment on whether the current design looks good?

I plan to start voting in 3 days if there is no follow-up comment.

Thanks,
Dong


On Fri, Sep 15, 2023 at 2:01 PM Jark Wu  wrote:

> Hi Dong,
>
> > Note that we can not simply enforce the semantics of "any invocation of
> > setIsProcessingBacklog(false) will set the job's backlog status to
> false".
> > Suppose we have a job with two operators, where operatorA invokes
> > setIsProcessingBacklog(false) and operatorB invokes
> > setIsProcessingBacklog(true). There will be conflict if we use the
> > semantics of "any invocation of setIsProcessingBacklog(false) will set
> the
> > job's backlog status to false".
>
> So it should set job's backlog status to false if the job has only a single
> source,
> right?
>
> Could you elaborate on the behavior if there is a job with a single source,
> and the watermark lag exceeds the configured value (should set backlog to
> true?),
> but the source invokes "setIsProcessingBacklog(false)"? Or the inverse one,
> the source invokes "setIsProcessingBacklog(false)" first, but the watermark
> lag
> exceeds the configured value.
>
> This is the conflict I'm concerned about.
>
> Best,
> Jark
>
> On Fri, 15 Sept 2023 at 12:00, Dong Lin  wrote:
>
> > Hi Jark,
> >
> > Please see my comments inline.
> >
> > On Fri, Sep 15, 2023 at 10:35 AM Jark Wu  wrote:
> >
> > > Hi Dong,
> > >
> > > Please see my comments inline below.
> >
> >
> > > > Hmm.. can you explain what you mean by "different watermark delay
> > > > definitions for each source"?
> > >
> > > For example, "table1" defines a watermark with delay 5 seconds,
> > > "table2" defines a watermark with delay 10 seconds. They have different
> > > watermark delay definitions. So it is also reasonable they have
> different
> > > watermark lag definitions, e.g., "table1" allows "10mins" and "table2"
> > > allows "20mins".
> > >
> >
> > I think the watermark delay you mentioned above is conceptually /
> > fundamentally different from the watermark-lag-threshold proposed in this
> > FLIP.
> >
> > It might be useful to revisit the semantics of these two concepts:
> > - watermark delay is used to account for the maximum amount of
> orderliness
> > that users expect (or willing to wait for) for records from a given
> source.
> > - watermark-lag-threshold is used to define when processing latency is no
> > longer important (e.g. because data is already stale).
> >
> > Even though users might expect different out of orderliness for different
> > sources, users do not necessarily have different definitions / thresholds
> > for when a record is considered "already stale".
> >
> >
> > >
> > > > I think there is probably misunderstanding here. FLIP-309 does NOT
> > > directly
> > > > specify when backlog is false. It is intentionally specified in such
> a
> > > way
> > > > that there will  not be any conflict between these rules.
> > >
> > > Do you mean FLIP-309 doesn't allow to specify backlog to be false?
> > > Is this mentioned in FLIP-309? This is completely different from what I
> > >
> >
> > Can you explain what you mean by "allow to specify backlog to be false"?
> >
> > If what you mean is that "can invoke setIsProcessingBacklog(false)", then
> > FLIP-309 supports doing this.
> >
> > If what you mean is that "any invocation of setIsProcessingBacklog(false)
> > will set the job's backlog status to false", then FLIP-309 does not
> support
> > this. I believe the existing Java doc of this API and FLIP-309 is
> > compatible with this explanation.
> >
> > Note that we can not simply enforce the semantics of "any invocation of
> > setIsProcessingBacklog(false) will set the job's backlog status to
> false".
> > Suppose we have a job with two operators, where operatorA invokes
> > setIsProcessingBacklog(false) and operatorB invokes
> > setIsProcessingBacklog(true). There will be conflict if we use the
> > semantics of "any invocation of setIsProcessingBacklog(false) will set
> the
> > job's backlog status to false".
> >
> > Would this answer your question?
> >
> > Best,
> > Dong
> >
> >
> > > understand. From the API interface
> "ctx.setIsProcessingBacklog(boolean)",
> > > it allows users to invoke "setIsProcessingBacklog(false)". And FLIP-309
> > > also says "MySQL CDC source should report isProcessingBacklog=false
> > > at the beginning of the changelog stage." If not, maybe we need to
> > revisit
> > > FLIP-309.
> >
> >
> > > Best,
> > > Jark
> > >
> > >
> > >
> > > On Fri, 15 Sept 2023 at 08:41, Dong Lin  wrote:
> > >
> > > > Hi Jark,
> > > >
> > > > Do you have any follow-up comment?
> > > >
> > > > My gut feeling is that suppose we need to support per-source
> watermark
> > > lag
> > > > specification in the future (not sure we have a use-case for this
> right
> > > > now), we can add such a config in the future with a follow-up FLIP.
> The
> > > > job-level config will still be useful as it makes users'
> configuration
> > > > simpler for common scenarios.
> 

Re: [DISCUSS] FLIP-328: Allow source operators to determine isProcessingBacklog based on watermark lag

2023-09-15 Thread Dong Lin
Hi Jark,

Please see my reply inline.

On Fri, Sep 15, 2023 at 2:01 PM Jark Wu  wrote:

> Hi Dong,
>
> > Note that we can not simply enforce the semantics of "any invocation of
> > setIsProcessingBacklog(false) will set the job's backlog status to
> false".
> > Suppose we have a job with two operators, where operatorA invokes
> > setIsProcessingBacklog(false) and operatorB invokes
> > setIsProcessingBacklog(true). There will be conflict if we use the
> > semantics of "any invocation of setIsProcessingBacklog(false) will set
> the
> > job's backlog status to false".
>
> So it should set job's backlog status to false if the job has only a single
> source,
> right?
>

As of the FLIP-309's implementation, with the constraint that there is only
one API to set backlog=true, your understanding is correct.

However, it might be useful to recall that FLIP-309 is proposed with the
explicit plan to add more strategies to set backlog=true, which is
documented in its future work section. The above statement, which assumes
there is only one strategy, is not useful in the long term.

With this knowledge in mind, a better and more long-lasting way to
interpret the semantics of setIsProcessingBacklog() would be this: "it
should set job's backlog status to false if the job has only a single
source AND there is no other strategy to set backlog=true".


> Could you elaborate on the behavior if there is a job with a single source,
> and the watermark lag exceeds the configured value (should set backlog to
> true?),
> but the source invokes "setIsProcessingBacklog(false)"? Or the inverse one,
> the source invokes "setIsProcessingBacklog(false)" first, but the watermark
> lag
> exceeds the configured value.
>

Suppose setIsProcessingBacklog(false) is invoked, and watermark lag exceeds
the configured value, then the job's backlog status is set to true.

The rationale is that if there is any strategy that is triggered and says
backlog=true, then job's backlog should be true. Otherwise, the job's
backlog status is false.

The key idea to avoid conflict is that we will not add any API with the
capability to explicitly set job's backlog status to false. A job's backlog
status is false if and only if no rule says it should be set to true.

BTW, I can understand that setIsProcessingBacklog(false) appears to suggest
the job's backlog is set to false. We can certainly update its Java doc to
make the semantics clearer. If you have any suggestion on a better name for
this method, we can also update it accordingly.

What do you think?


> This is the conflict I'm concerned about.
>
> Best,
> Jark
>
> On Fri, 15 Sept 2023 at 12:00, Dong Lin  wrote:
>
> > Hi Jark,
> >
> > Please see my comments inline.
> >
> > On Fri, Sep 15, 2023 at 10:35 AM Jark Wu  wrote:
> >
> > > Hi Dong,
> > >
> > > Please see my comments inline below.
> >
> >
> > > > Hmm.. can you explain what you mean by "different watermark delay
> > > > definitions for each source"?
> > >
> > > For example, "table1" defines a watermark with delay 5 seconds,
> > > "table2" defines a watermark with delay 10 seconds. They have different
> > > watermark delay definitions. So it is also reasonable they have
> different
> > > watermark lag definitions, e.g., "table1" allows "10mins" and "table2"
> > > allows "20mins".
> > >
> >
> > I think the watermark delay you mentioned above is conceptually /
> > fundamentally different from the watermark-lag-threshold proposed in this
> > FLIP.
> >
> > It might be useful to revisit the semantics of these two concepts:
> > - watermark delay is used to account for the maximum amount of
> orderliness
> > that users expect (or willing to wait for) for records from a given
> source.
> > - watermark-lag-threshold is used to define when processing latency is no
> > longer important (e.g. because data is already stale).
> >
> > Even though users might expect different out of orderliness for different
> > sources, users do not necessarily have different definitions / thresholds
> > for when a record is considered "already stale".
> >
> >
> > >
> > > > I think there is probably misunderstanding here. FLIP-309 does NOT
> > > directly
> > > > specify when backlog is false. It is intentionally specified in such
> a
> > > way
> > > > that there will  not be any conflict between these rules.
> > >
> > > Do you mean FLIP-309 doesn't allow to specify backlog to be false?
> > > Is this mentioned in FLIP-309? This is completely different from what I
> > >
> >
> > Can you explain what you mean by "allow to specify backlog to be false"?
> >
> > If what you mean is that "can invoke setIsProcessingBacklog(false)", then
> > FLIP-309 supports doing this.
> >
> > If what you mean is that "any invocation of setIsProcessingBacklog(false)
> > will set the job's backlog status to false", then FLIP-309 does not
> support
> > this. I believe the existing Java doc of this API and FLIP-309 is
> > compatible with this explanation.
> >
> > Note that we can not simply enforce the 

Re: [DISCUSS] FLIP-328: Allow source operators to determine isProcessingBacklog based on watermark lag

2023-09-15 Thread Jark Wu
Hi Dong,

> Note that we can not simply enforce the semantics of "any invocation of
> setIsProcessingBacklog(false) will set the job's backlog status to false".
> Suppose we have a job with two operators, where operatorA invokes
> setIsProcessingBacklog(false) and operatorB invokes
> setIsProcessingBacklog(true). There will be conflict if we use the
> semantics of "any invocation of setIsProcessingBacklog(false) will set the
> job's backlog status to false".

So it should set job's backlog status to false if the job has only a single
source,
right?

Could you elaborate on the behavior if there is a job with a single source,
and the watermark lag exceeds the configured value (should set backlog to
true?),
but the source invokes "setIsProcessingBacklog(false)"? Or the inverse one,
the source invokes "setIsProcessingBacklog(false)" first, but the watermark
lag
exceeds the configured value.

This is the conflict I'm concerned about.

Best,
Jark

On Fri, 15 Sept 2023 at 12:00, Dong Lin  wrote:

> Hi Jark,
>
> Please see my comments inline.
>
> On Fri, Sep 15, 2023 at 10:35 AM Jark Wu  wrote:
>
> > Hi Dong,
> >
> > Please see my comments inline below.
>
>
> > > Hmm.. can you explain what you mean by "different watermark delay
> > > definitions for each source"?
> >
> > For example, "table1" defines a watermark with delay 5 seconds,
> > "table2" defines a watermark with delay 10 seconds. They have different
> > watermark delay definitions. So it is also reasonable they have different
> > watermark lag definitions, e.g., "table1" allows "10mins" and "table2"
> > allows "20mins".
> >
>
> I think the watermark delay you mentioned above is conceptually /
> fundamentally different from the watermark-lag-threshold proposed in this
> FLIP.
>
> It might be useful to revisit the semantics of these two concepts:
> - watermark delay is used to account for the maximum amount of orderliness
> that users expect (or willing to wait for) for records from a given source.
> - watermark-lag-threshold is used to define when processing latency is no
> longer important (e.g. because data is already stale).
>
> Even though users might expect different out of orderliness for different
> sources, users do not necessarily have different definitions / thresholds
> for when a record is considered "already stale".
>
>
> >
> > > I think there is probably misunderstanding here. FLIP-309 does NOT
> > directly
> > > specify when backlog is false. It is intentionally specified in such a
> > way
> > > that there will  not be any conflict between these rules.
> >
> > Do you mean FLIP-309 doesn't allow to specify backlog to be false?
> > Is this mentioned in FLIP-309? This is completely different from what I
> >
>
> Can you explain what you mean by "allow to specify backlog to be false"?
>
> If what you mean is that "can invoke setIsProcessingBacklog(false)", then
> FLIP-309 supports doing this.
>
> If what you mean is that "any invocation of setIsProcessingBacklog(false)
> will set the job's backlog status to false", then FLIP-309 does not support
> this. I believe the existing Java doc of this API and FLIP-309 is
> compatible with this explanation.
>
> Note that we can not simply enforce the semantics of "any invocation of
> setIsProcessingBacklog(false) will set the job's backlog status to false".
> Suppose we have a job with two operators, where operatorA invokes
> setIsProcessingBacklog(false) and operatorB invokes
> setIsProcessingBacklog(true). There will be conflict if we use the
> semantics of "any invocation of setIsProcessingBacklog(false) will set the
> job's backlog status to false".
>
> Would this answer your question?
>
> Best,
> Dong
>
>
> > understand. From the API interface "ctx.setIsProcessingBacklog(boolean)",
> > it allows users to invoke "setIsProcessingBacklog(false)". And FLIP-309
> > also says "MySQL CDC source should report isProcessingBacklog=false
> > at the beginning of the changelog stage." If not, maybe we need to
> revisit
> > FLIP-309.
>
>
> > Best,
> > Jark
> >
> >
> >
> > On Fri, 15 Sept 2023 at 08:41, Dong Lin  wrote:
> >
> > > Hi Jark,
> > >
> > > Do you have any follow-up comment?
> > >
> > > My gut feeling is that suppose we need to support per-source watermark
> > lag
> > > specification in the future (not sure we have a use-case for this right
> > > now), we can add such a config in the future with a follow-up FLIP. The
> > > job-level config will still be useful as it makes users' configuration
> > > simpler for common scenarios.
> > >
> > > If it is OK, can we agree to make incremental progress for Flink and
> > start
> > > a voting thread for this FLIP?
> > >
> > > Thanks,
> > > Dong
> > >
> > >
> > > On Mon, Sep 11, 2023 at 4:41 PM Jark Wu  wrote:
> > >
> > > > Hi Dong,
> > > >
> > > > Please see my comments inline.
> > > >
> > > > >  As a result, the proposed job-level
> > > > > config will be applied only in the changelog stage. So there is no
> > > > > difference between these two approaches in this 

Re: [DISCUSS] FLIP-328: Allow source operators to determine isProcessingBacklog based on watermark lag

2023-09-14 Thread Dong Lin
Hi Jark,

Please see my comments inline.

On Fri, Sep 15, 2023 at 10:35 AM Jark Wu  wrote:

> Hi Dong,
>
> Please see my comments inline below.


> > Hmm.. can you explain what you mean by "different watermark delay
> > definitions for each source"?
>
> For example, "table1" defines a watermark with delay 5 seconds,
> "table2" defines a watermark with delay 10 seconds. They have different
> watermark delay definitions. So it is also reasonable they have different
> watermark lag definitions, e.g., "table1" allows "10mins" and "table2"
> allows "20mins".
>

I think the watermark delay you mentioned above is conceptually /
fundamentally different from the watermark-lag-threshold proposed in this
FLIP.

It might be useful to revisit the semantics of these two concepts:
- watermark delay is used to account for the maximum amount of orderliness
that users expect (or willing to wait for) for records from a given source.
- watermark-lag-threshold is used to define when processing latency is no
longer important (e.g. because data is already stale).

Even though users might expect different out of orderliness for different
sources, users do not necessarily have different definitions / thresholds
for when a record is considered "already stale".


>
> > I think there is probably misunderstanding here. FLIP-309 does NOT
> directly
> > specify when backlog is false. It is intentionally specified in such a
> way
> > that there will  not be any conflict between these rules.
>
> Do you mean FLIP-309 doesn't allow to specify backlog to be false?
> Is this mentioned in FLIP-309? This is completely different from what I
>

Can you explain what you mean by "allow to specify backlog to be false"?

If what you mean is that "can invoke setIsProcessingBacklog(false)", then
FLIP-309 supports doing this.

If what you mean is that "any invocation of setIsProcessingBacklog(false)
will set the job's backlog status to false", then FLIP-309 does not support
this. I believe the existing Java doc of this API and FLIP-309 is
compatible with this explanation.

Note that we can not simply enforce the semantics of "any invocation of
setIsProcessingBacklog(false) will set the job's backlog status to false".
Suppose we have a job with two operators, where operatorA invokes
setIsProcessingBacklog(false) and operatorB invokes
setIsProcessingBacklog(true). There will be conflict if we use the
semantics of "any invocation of setIsProcessingBacklog(false) will set the
job's backlog status to false".

Would this answer your question?

Best,
Dong


> understand. From the API interface "ctx.setIsProcessingBacklog(boolean)",
> it allows users to invoke "setIsProcessingBacklog(false)". And FLIP-309
> also says "MySQL CDC source should report isProcessingBacklog=false
> at the beginning of the changelog stage." If not, maybe we need to revisit
> FLIP-309.


> Best,
> Jark
>
>
>
> On Fri, 15 Sept 2023 at 08:41, Dong Lin  wrote:
>
> > Hi Jark,
> >
> > Do you have any follow-up comment?
> >
> > My gut feeling is that suppose we need to support per-source watermark
> lag
> > specification in the future (not sure we have a use-case for this right
> > now), we can add such a config in the future with a follow-up FLIP. The
> > job-level config will still be useful as it makes users' configuration
> > simpler for common scenarios.
> >
> > If it is OK, can we agree to make incremental progress for Flink and
> start
> > a voting thread for this FLIP?
> >
> > Thanks,
> > Dong
> >
> >
> > On Mon, Sep 11, 2023 at 4:41 PM Jark Wu  wrote:
> >
> > > Hi Dong,
> > >
> > > Please see my comments inline.
> > >
> > > >  As a result, the proposed job-level
> > > > config will be applied only in the changelog stage. So there is no
> > > > difference between these two approaches in this particular case,
> right?
> > >
> > > How the job-level config can be applied ONLY in the changelog stage?
> > > I think it is only possible if it is implemented by the CDC source
> > itself,
> > > because the framework doesn't know which stage of the source is.
> > > Know that the CDC source may emit watermarks with a very small lag
> > > in the snapshot stage, and the job-level config may turn the backlog
> > > status into false.
> > >
> > > > On the other hand, per-source config will be necessary if users want
> to
> > > > apply different watermark lag thresholds for different sources in the
> > > same
> > > > job.
> > >
> > > We also have different watermark delay definitions for each source,
> > > I think this's also reasonable and necessary to have different
> watermark
> > > lags.
> > >
> > >
> > > > Each source can have its own rule that specifies when the backlog can
> > be
> > > true
> > > > (e.g. MySql CDC says the backlog should be true during the snapshot
> > > stage).
> > > > And we can have a job-level config that specifies when the backlog
> > should
> > > > be true. Note that it is designed in such a way that none of these
> > rules
> > > > specify when the backlog should be 

Re: [DISCUSS] FLIP-328: Allow source operators to determine isProcessingBacklog based on watermark lag

2023-09-14 Thread Jark Wu
Hi Dong,

Please see my comments inline below.

> Hmm.. can you explain what you mean by "different watermark delay
> definitions for each source"?

For example, "table1" defines a watermark with delay 5 seconds,
"table2" defines a watermark with delay 10 seconds. They have different
watermark delay definitions. So it is also reasonable they have different
watermark lag definitions, e.g., "table1" allows "10mins" and "table2"
allows "20mins".

> I think there is probably misunderstanding here. FLIP-309 does NOT
directly
> specify when backlog is false. It is intentionally specified in such a way
> that there will  not be any conflict between these rules.

Do you mean FLIP-309 doesn't allow to specify backlog to be false?
Is this mentioned in FLIP-309? This is completely different from what I
understand. From the API interface "ctx.setIsProcessingBacklog(boolean)",
it allows users to invoke "setIsProcessingBacklog(false)". And FLIP-309
also says "MySQL CDC source should report isProcessingBacklog=false
at the beginning of the changelog stage." If not, maybe we need to revisit
FLIP-309.

Best,
Jark



On Fri, 15 Sept 2023 at 08:41, Dong Lin  wrote:

> Hi Jark,
>
> Do you have any follow-up comment?
>
> My gut feeling is that suppose we need to support per-source watermark lag
> specification in the future (not sure we have a use-case for this right
> now), we can add such a config in the future with a follow-up FLIP. The
> job-level config will still be useful as it makes users' configuration
> simpler for common scenarios.
>
> If it is OK, can we agree to make incremental progress for Flink and start
> a voting thread for this FLIP?
>
> Thanks,
> Dong
>
>
> On Mon, Sep 11, 2023 at 4:41 PM Jark Wu  wrote:
>
> > Hi Dong,
> >
> > Please see my comments inline.
> >
> > >  As a result, the proposed job-level
> > > config will be applied only in the changelog stage. So there is no
> > > difference between these two approaches in this particular case, right?
> >
> > How the job-level config can be applied ONLY in the changelog stage?
> > I think it is only possible if it is implemented by the CDC source
> itself,
> > because the framework doesn't know which stage of the source is.
> > Know that the CDC source may emit watermarks with a very small lag
> > in the snapshot stage, and the job-level config may turn the backlog
> > status into false.
> >
> > > On the other hand, per-source config will be necessary if users want to
> > > apply different watermark lag thresholds for different sources in the
> > same
> > > job.
> >
> > We also have different watermark delay definitions for each source,
> > I think this's also reasonable and necessary to have different watermark
> > lags.
> >
> >
> > > Each source can have its own rule that specifies when the backlog can
> be
> > true
> > > (e.g. MySql CDC says the backlog should be true during the snapshot
> > stage).
> > > And we can have a job-level config that specifies when the backlog
> should
> > > be true. Note that it is designed in such a way that none of these
> rules
> > > specify when the backlog should be false. That is why there is no
> > conflict
> > > by definition.
> >
> > IIUC, FLIP-309 provides `setIsProcessingBacklog` to specify when the
> > backlog
> > is true and when is FALSE. This conflicts with the job-level config as it
> > will turn
> > the status into true.
> >
> > > If I understand your comments correctly, you mean that we might have a
> > > Flink SQL DDL with user-defined watermark expressions. And users also
> > want
> > > to set the backlog to true if the watermark generated by that
> > > user-specified expression exceeds a threshold.
> >
> > No. I mean the source may not support generating watermarks, so the
> > watermark
> > expression is applied in a following operator (instead of in the source
> > operator).
> > This will result in the watermark lag doesn't work in this case and
> confuse
> > users.
> >
> > > You are right that this is a limitation. However, this is only a
> > short-term
> > > limitation which we added to make sure that we can focus on the
> > capability
> > > to switch from backlog=true to backlog=false. In the future, we will
> > remove
> > > this limitation and also support switching from backlog=false to
> > > backlog=true.
> >
> > I can understand it may be difficult to support runtime mode switching
> back
> > and forth.
> > However, I think this should be a limitation of FLIP-327, not FLIP-328.
> > IIUC,
> > FLIP-309 doesn't have this limitation, right? I just don't understand
> > what's the
> > challenge to switch a flag?
> >
> > Best,
> > Jark
> >
> >
> > On Sun, 10 Sept 2023 at 19:44, Dong Lin  wrote:
> >
> > > Hi Jark,
> > >
> > > Thanks for the comments. Please see my comments inline.
> > >
> > > On Sat, Sep 9, 2023 at 4:13 PM Jark Wu  wrote:
> > >
> > > > Hi Xuannan,
> > > >
> > > > I leave my comments inline.
> > > >
> > > > > In the case where a user wants to
> > > > > use a CDC source and also determine 

Re: [DISCUSS] FLIP-328: Allow source operators to determine isProcessingBacklog based on watermark lag

2023-09-14 Thread Dong Lin
Hi Jark,

Do you have any follow-up comment?

My gut feeling is that suppose we need to support per-source watermark lag
specification in the future (not sure we have a use-case for this right
now), we can add such a config in the future with a follow-up FLIP. The
job-level config will still be useful as it makes users' configuration
simpler for common scenarios.

If it is OK, can we agree to make incremental progress for Flink and start
a voting thread for this FLIP?

Thanks,
Dong


On Mon, Sep 11, 2023 at 4:41 PM Jark Wu  wrote:

> Hi Dong,
>
> Please see my comments inline.
>
> >  As a result, the proposed job-level
> > config will be applied only in the changelog stage. So there is no
> > difference between these two approaches in this particular case, right?
>
> How the job-level config can be applied ONLY in the changelog stage?
> I think it is only possible if it is implemented by the CDC source itself,
> because the framework doesn't know which stage of the source is.
> Know that the CDC source may emit watermarks with a very small lag
> in the snapshot stage, and the job-level config may turn the backlog
> status into false.
>
> > On the other hand, per-source config will be necessary if users want to
> > apply different watermark lag thresholds for different sources in the
> same
> > job.
>
> We also have different watermark delay definitions for each source,
> I think this's also reasonable and necessary to have different watermark
> lags.
>
>
> > Each source can have its own rule that specifies when the backlog can be
> true
> > (e.g. MySql CDC says the backlog should be true during the snapshot
> stage).
> > And we can have a job-level config that specifies when the backlog should
> > be true. Note that it is designed in such a way that none of these rules
> > specify when the backlog should be false. That is why there is no
> conflict
> > by definition.
>
> IIUC, FLIP-309 provides `setIsProcessingBacklog` to specify when the
> backlog
> is true and when is FALSE. This conflicts with the job-level config as it
> will turn
> the status into true.
>
> > If I understand your comments correctly, you mean that we might have a
> > Flink SQL DDL with user-defined watermark expressions. And users also
> want
> > to set the backlog to true if the watermark generated by that
> > user-specified expression exceeds a threshold.
>
> No. I mean the source may not support generating watermarks, so the
> watermark
> expression is applied in a following operator (instead of in the source
> operator).
> This will result in the watermark lag doesn't work in this case and confuse
> users.
>
> > You are right that this is a limitation. However, this is only a
> short-term
> > limitation which we added to make sure that we can focus on the
> capability
> > to switch from backlog=true to backlog=false. In the future, we will
> remove
> > this limitation and also support switching from backlog=false to
> > backlog=true.
>
> I can understand it may be difficult to support runtime mode switching back
> and forth.
> However, I think this should be a limitation of FLIP-327, not FLIP-328.
> IIUC,
> FLIP-309 doesn't have this limitation, right? I just don't understand
> what's the
> challenge to switch a flag?
>
> Best,
> Jark
>
>
> On Sun, 10 Sept 2023 at 19:44, Dong Lin  wrote:
>
> > Hi Jark,
> >
> > Thanks for the comments. Please see my comments inline.
> >
> > On Sat, Sep 9, 2023 at 4:13 PM Jark Wu  wrote:
> >
> > > Hi Xuannan,
> > >
> > > I leave my comments inline.
> > >
> > > > In the case where a user wants to
> > > > use a CDC source and also determine backlog status based on watermark
> > > > lag, we still need to define the rule when that occurs
> > >
> > > This rule should be defined by the source itself (who knows backlog
> > best),
> > > not by the framework. In the case of CDC source, it reports
> > isBacklog=true
> > > during snapshot stage, and report isBacklog=false during changelog
> stage
> > if
> > > watermark-lag is within the threshold.
> > >
> >
> > I am not sure I fully understand the difference between adding a
> job-level
> > config vs. adding a per-source config.
> >
> > In the case of CDC, its watermark lag should be either unde-defined or
> > really large in the snapshot stage. As a result, the proposed job-level
> > config will be applied only in the changelog stage. So there is no
> > difference between these two approaches in this particular case, right?
> >
> > There are two advantages of the job-level config over per-source config:
> >
> > 1) Configuration is simpler. For example, suppose a user has a Flink job
> > that consumes records from multiple Kafka sources and wants to determine
> > backlog status for these Kafka sources using the same watermark lag
> > threshold, there is no need for users to repeatedly specify this
> threshold
> > for each source.
> >
> > 2) There is a smaller number of public APIs overall. In particular,
> instead
> > of repeatedly adding a 

Re: [DISCUSS] FLIP-328: Allow source operators to determine isProcessingBacklog based on watermark lag

2023-09-12 Thread Dong Lin
Hi Jark,

Please see my comments inline.

On Mon, Sep 11, 2023 at 4:41 PM Jark Wu  wrote:

> Hi Dong,
>
> Please see my comments inline.
>
> >  As a result, the proposed job-level
> > config will be applied only in the changelog stage. So there is no
> > difference between these two approaches in this particular case, right?
>
> How the job-level config can be applied ONLY in the changelog stage?
>

Actually, the job-level config is *not* applicable only in the changelog
state. It only depends on the watermark lag of source operators. Therefore,
it is also applicable to the snapshot stage (or any other stage) as long as
watermark lag is well-defined in that stage.


> I think it is only possible if it is implemented by the CDC source itself,
> because the framework doesn't know which stage of the source is.
> Know that the CDC source may emit watermarks with a very small lag
> in the snapshot stage, and the job-level config may turn the backlog
> status into false.
>

Just to clarify, the job-level config introduced in FLIP-328 will *not* set
backlog to false even if CDC source emits watermark lag with very small lag
in the snapshot stage.

This is because the job-level config introduced in FLIP-328 is only
effective (i.e. set backlog to true) when watermark lag is high. Given that
CDC source itself has an extra rule that sets backlog to true in the
snapshot stage, that means the backlog will be true even if its watermark
lag is small.

Would this address your concern here?


>
> > On the other hand, per-source config will be necessary if users want to
> > apply different watermark lag thresholds for different sources in the
> same
> > job.
>
> We also have different watermark delay definitions for each source,
> I think this's also reasonable and necessary to have different watermark
> lags.
>

Hmm.. can you explain what you mean by "different watermark delay
definitions for each source"?

According to FLIP-33
,
watermark lag is defined as "the time in milliseconds that the watermark
lags behind the wall clock time". This definition has the same semantics
and value type regardless of the specific source definition.

Do you mean to introduce another definition of watermark lag? If so, can
you explain the use-case and the definitions, so that we can better gauge
whether we should update FLIP-328 based on this idea?


>
> > Each source can have its own rule that specifies when the backlog can be
> true
> > (e.g. MySql CDC says the backlog should be true during the snapshot
> stage).
> > And we can have a job-level config that specifies when the backlog should
> > be true. Note that it is designed in such a way that none of these rules
> > specify when the backlog should be false. That is why there is no
> conflict
> > by definition.
>
> IIUC, FLIP-309 provides `setIsProcessingBacklog` to specify when the
> backlog
> is true and when is FALSE. This conflicts with the job-level config as it
>

I think there is probably misunderstanding here. FLIP-309 does NOT directly
specify when backlog is false. It is intentionally specified in such a way
that there will  not be any conflict between these rules.

More specifically, it is mentioned in the Java doc
of setIsProcessingBacklog that "If no API has been explicitly invoked to
specify the backlog status of a source, the source is considered to have
isProcessingBacklog=false by default".

The job-level config introduced in FLIP-328 is also considered an API. So
here is what it works following the definitions in FLIP-309:
if setIsProcessingBacklog(true) is explicilty invoked for a source, or if
the job-level config causes the backlog status to be set to true, then the
overall backlog status is set to true. Otherwise, it is set to false.

We can update the Java doc of setIsProcessingBacklog() to make this
definition/logic clearer.

Would this address your concern?


> will turn
> the status into true.
>
> > If I understand your comments correctly, you mean that we might have a
> > Flink SQL DDL with user-defined watermark expressions. And users also
> want
> > to set the backlog to true if the watermark generated by that
> > user-specified expression exceeds a threshold.
>
> No. I mean the source may not support generating watermarks, so the
> watermark
> expression is applied in a following operator (instead of in the source
> operator).
> This will result in the watermark lag doesn't work in this case and confuse
> users.
>

Can you provide a specific example / use-case that needs users to generate
a watermark in the following operator for a source that does not support
generating watermarks?

Depending on the concrete use-case, maybe the right solution is to update
the source to support generating watermark


> > You are right that this is a limitation. However, this is only a
> short-term
> > limitation which we added to make sure that we can focus on the
> capability
> > 

Re: [DISCUSS] FLIP-328: Allow source operators to determine isProcessingBacklog based on watermark lag

2023-09-11 Thread Jark Wu
Hi Dong,

Please see my comments inline.

>  As a result, the proposed job-level
> config will be applied only in the changelog stage. So there is no
> difference between these two approaches in this particular case, right?

How the job-level config can be applied ONLY in the changelog stage?
I think it is only possible if it is implemented by the CDC source itself,
because the framework doesn't know which stage of the source is.
Know that the CDC source may emit watermarks with a very small lag
in the snapshot stage, and the job-level config may turn the backlog
status into false.

> On the other hand, per-source config will be necessary if users want to
> apply different watermark lag thresholds for different sources in the same
> job.

We also have different watermark delay definitions for each source,
I think this's also reasonable and necessary to have different watermark
lags.


> Each source can have its own rule that specifies when the backlog can be
true
> (e.g. MySql CDC says the backlog should be true during the snapshot
stage).
> And we can have a job-level config that specifies when the backlog should
> be true. Note that it is designed in such a way that none of these rules
> specify when the backlog should be false. That is why there is no conflict
> by definition.

IIUC, FLIP-309 provides `setIsProcessingBacklog` to specify when the backlog
is true and when is FALSE. This conflicts with the job-level config as it
will turn
the status into true.

> If I understand your comments correctly, you mean that we might have a
> Flink SQL DDL with user-defined watermark expressions. And users also want
> to set the backlog to true if the watermark generated by that
> user-specified expression exceeds a threshold.

No. I mean the source may not support generating watermarks, so the
watermark
expression is applied in a following operator (instead of in the source
operator).
This will result in the watermark lag doesn't work in this case and confuse
users.

> You are right that this is a limitation. However, this is only a
short-term
> limitation which we added to make sure that we can focus on the capability
> to switch from backlog=true to backlog=false. In the future, we will
remove
> this limitation and also support switching from backlog=false to
> backlog=true.

I can understand it may be difficult to support runtime mode switching back
and forth.
However, I think this should be a limitation of FLIP-327, not FLIP-328.
IIUC,
FLIP-309 doesn't have this limitation, right? I just don't understand
what's the
challenge to switch a flag?

Best,
Jark


On Sun, 10 Sept 2023 at 19:44, Dong Lin  wrote:

> Hi Jark,
>
> Thanks for the comments. Please see my comments inline.
>
> On Sat, Sep 9, 2023 at 4:13 PM Jark Wu  wrote:
>
> > Hi Xuannan,
> >
> > I leave my comments inline.
> >
> > > In the case where a user wants to
> > > use a CDC source and also determine backlog status based on watermark
> > > lag, we still need to define the rule when that occurs
> >
> > This rule should be defined by the source itself (who knows backlog
> best),
> > not by the framework. In the case of CDC source, it reports
> isBacklog=true
> > during snapshot stage, and report isBacklog=false during changelog stage
> if
> > watermark-lag is within the threshold.
> >
>
> I am not sure I fully understand the difference between adding a job-level
> config vs. adding a per-source config.
>
> In the case of CDC, its watermark lag should be either unde-defined or
> really large in the snapshot stage. As a result, the proposed job-level
> config will be applied only in the changelog stage. So there is no
> difference between these two approaches in this particular case, right?
>
> There are two advantages of the job-level config over per-source config:
>
> 1) Configuration is simpler. For example, suppose a user has a Flink job
> that consumes records from multiple Kafka sources and wants to determine
> backlog status for these Kafka sources using the same watermark lag
> threshold, there is no need for users to repeatedly specify this threshold
> for each source.
>
> 2) There is a smaller number of public APIs overall. In particular, instead
> of repeatedly adding a setProcessingBacklogWatermarkLagThreshold() API for
> every source operator that has even-time watermark lag defined, we only
> need to add one job-level config. Less public API means better simplicity
> and maintainability in general.
>
> On the other hand, per-source config will be necessary if users want to
> apply different watermark lag thresholds for different sources in the same
> job. Personally, I find this a bit counter-intuitive for users to specify
> different watermark lag thresholds in the same job.
>
> Do you think there is any real-word use-case that requires this? Could you
> provide a specific use-case where per-source config can provide an
> advantage over the job-level config?
>
>
> > I think it's not intuitive to combine it with the logical OR operation.

Re: [DISCUSS] FLIP-328: Allow source operators to determine isProcessingBacklog based on watermark lag

2023-09-10 Thread Dong Lin
Hi Jark,

Thanks for the comments. Please see my comments inline.

On Sat, Sep 9, 2023 at 4:13 PM Jark Wu  wrote:

> Hi Xuannan,
>
> I leave my comments inline.
>
> > In the case where a user wants to
> > use a CDC source and also determine backlog status based on watermark
> > lag, we still need to define the rule when that occurs
>
> This rule should be defined by the source itself (who knows backlog best),
> not by the framework. In the case of CDC source, it reports isBacklog=true
> during snapshot stage, and report isBacklog=false during changelog stage if
> watermark-lag is within the threshold.
>

I am not sure I fully understand the difference between adding a job-level
config vs. adding a per-source config.

In the case of CDC, its watermark lag should be either unde-defined or
really large in the snapshot stage. As a result, the proposed job-level
config will be applied only in the changelog stage. So there is no
difference between these two approaches in this particular case, right?

There are two advantages of the job-level config over per-source config:

1) Configuration is simpler. For example, suppose a user has a Flink job
that consumes records from multiple Kafka sources and wants to determine
backlog status for these Kafka sources using the same watermark lag
threshold, there is no need for users to repeatedly specify this threshold
for each source.

2) There is a smaller number of public APIs overall. In particular, instead
of repeatedly adding a setProcessingBacklogWatermarkLagThreshold() API for
every source operator that has even-time watermark lag defined, we only
need to add one job-level config. Less public API means better simplicity
and maintainability in general.

On the other hand, per-source config will be necessary if users want to
apply different watermark lag thresholds for different sources in the same
job. Personally, I find this a bit counter-intuitive for users to specify
different watermark lag thresholds in the same job.

Do you think there is any real-word use-case that requires this? Could you
provide a specific use-case where per-source config can provide an
advantage over the job-level config?


> I think it's not intuitive to combine it with the logical OR operation.
> Even for the
> combination logic of backlog status from different channels, FLIP-309 said
> it is
> "up to the operator to determine its output records' isBacklog value" and
> proposed
> 3 different strategies. Therefore, I think backlog status from a single
> source should
> be up to the source.


For both the job-level config and the per-source config, it is eventually
up to the user to decide the computation logic of the backlog status.
Whether this mechanism is implemented at the per-source level or framework
level is probably more like an implementation detail.

Eventually, I think the choice between these two approaches depends on
whether we have any use-case for users to specify different watermark lag
thresholds in the same job.


>
> IMO, a better API design is not how to resolve conflicts but not
> introducing conflicts.


Just to clarify, the current FLIP does not introduce any conflict. Each
source can have its own rule that specifies when the backlog can be true
(e.g. MySql CDC says the backlog should be true during the snapshot stage).
And we can have a job-level config that specifies when the backlog should
be true. Note that it is designed in such a way that none of these rules
specify when the backlog should be false. That is why there is no conflict
by definition.



Let the source determine backlog status removes the conflicts and I don't
> see big
> disadvantages.
>
> > It should not confuse the user that
> > DataStream#assignTimestampsAndWatermarks doesn't work with
> > backlog.watermark-lag-threshold, as it is not a source.
>
> Hmm, so this configuration may confuse Flink SQL users, because all
> watermarks
> are defined on the source DDL, but it may use a separate operator to emit
> watermarks
> if the source doesn't support emitting watermarks.
>

If I understand your comments correctly, you mean that we might have a
Flink SQL DDL with user-defined watermark expressions. And users also want
to set the backlog to true if the watermark generated by that
user-specified expression exceeds a threshold.

That is a good point and use-case. I agree we should also cover this
scenario. And we can update FLIP-328 to mention that the job-level config
will also be applicable when the watermark derived from the Flink SQL DDL
exceeds this threshold. Would this address your concern?


>
> > I think the description in the FLIP actually means the other way
> > around, where the job can never switch back to batch mode once it has
> > switched into streaming mode. This is to align with the current state
> > of FLIP-327[1], where only switching from batch to stream mode is
> > supported.
>
> This sounds like a limitation of FLIP-327 (that execution mode depends on
> backlog status).
> But the 

Re: [DISCUSS] FLIP-328: Allow source operators to determine isProcessingBacklog based on watermark lag

2023-09-09 Thread Jark Wu
Hi Xuannan,

I leave my comments inline.

> In the case where a user wants to
> use a CDC source and also determine backlog status based on watermark
> lag, we still need to define the rule when that occurs

This rule should be defined by the source itself (who knows backlog best),
not by the framework. In the case of CDC source, it reports isBacklog=true
during snapshot stage, and report isBacklog=false during changelog stage if
watermark-lag is within the threshold.

I think it's not intuitive to combine it with the logical OR operation.
Even for the
combination logic of backlog status from different channels, FLIP-309 said
it is
"up to the operator to determine its output records' isBacklog value" and
proposed
3 different strategies. Therefore, I think backlog status from a single
source should
be up to the source.

IMO, a better API design is not how to resolve conflicts but not
introducing conflicts.
Let the source determine backlog status removes the conflicts and I don't
see big
disadvantages.

> It should not confuse the user that
> DataStream#assignTimestampsAndWatermarks doesn't work with
> backlog.watermark-lag-threshold, as it is not a source.

Hmm, so this configuration may confuse Flink SQL users, because all
watermarks
are defined on the source DDL, but it may use a separate operator to emit
watermarks
if the source doesn't support emitting watermarks.


> I think the description in the FLIP actually means the other way
> around, where the job can never switch back to batch mode once it has
> switched into streaming mode. This is to align with the current state
> of FLIP-327[1], where only switching from batch to stream mode is
> supported.

This sounds like a limitation of FLIP-327 (that execution mode depends on
backlog status).
But the backlog status shouldn't have this limitation, because it is not
only used for execution
switching.

Best,
Jark



On Fri, 8 Sept 2023 at 19:09, Xuannan Su  wrote:

> Hi Jark and Leonard,
>
> Thanks for the comments. Please see my reply below.
>
> @Jark
>
> > I think a better API doesn't compete with itself. Therefore, I'm in
> favor of
> > supporting the watermark lag threshold for each source without
> introducing
> > any framework API and configuration.
>
> I don't think supporting the watermark lag threshold for each source
> can avoid the competition problem. In the case where a user wants to
> use a CDC source and also determine backlog status based on watermark
> lag, we still need to define the rule when that occurs. With that
> said, I think it is more intuitive to combine it with the logical OR
> operation, as the strategies (FLIP-309, FLIP-328) only determine when
> the source's backlog status should be True. What do you think?
>
> > Besides, this can address another concern that the watermark may be
> > generated by DataStream#assignTimestampsAnd
> > Watermarks which doesn't
> > work with the backlog.watermark-lag-threshold job config
>
> The description of the configuration explicitly states that "a source
> would report isProcessingBacklog=true if its watermark lag exceeds the
> configured value". It should not confuse the user that
> DataStream#assignTimestampsAndWatermarks doesn't work with
> backlog.watermark-lag-threshold, as it is not a source.
>
> > Does that mean the job can never back to streaming mode once switches
> into
> > backlog mode? It sounds like not a complete FLIP to me. Is it possible to
> > support switching back in this FLIP?
>
> I think the description in the FLIP actually means the other way
> around, where the job can never switch back to batch mode once it has
> switched into streaming mode. This is to align with the current state
> of FLIP-327[1], where only switching from batch to stream mode is
> supported.
>
> @Leonard
>
> > > The FLIP describe that: And it should report isProcessingBacklog=false
> at the beginning of the snapshot stage.
> > This should be “changelog stage”
>
> I think the description is in FLIP-309. Thanks for pointing out. I
> updated the description.
>
> > I'm not sure if it's enough to support this feature only in FLIP-27
> Source. Although we are pushing the sourceFunction API to be removed, these
> APIs will be survive one or two versions in flink repo before they are
> actually removed.
>
> I agree that it is good to support the SourceFunction API. However,
> given that the SourceFunction API is marked as deprecated, I think I
> will prioritize supporting the FLIP-27 Source. We can support the
> SourceFunction API after the
> FLIP-27 source. What do you think?
>
> Best regards,
> Xuannan
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-327%3A+Support+switching+from+batch+to+stream+mode+to+improve+throughput+when+processing+backlog+data
>
>
>
>
> On Fri, Sep 8, 2023 at 1:02 AM Leonard Xu  wrote:
> >
> > Thanks Xuannan for driving this FLIP !
> >
> > The proposal generally looks good to me, but I still left some comments:
> >
> > > One more question about the FLIP is that the 

Re: [DISCUSS] FLIP-328: Allow source operators to determine isProcessingBacklog based on watermark lag

2023-09-08 Thread Xuannan Su
Hi Jark and Leonard,

Thanks for the comments. Please see my reply below.

@Jark

> I think a better API doesn't compete with itself. Therefore, I'm in favor of
> supporting the watermark lag threshold for each source without introducing
> any framework API and configuration.

I don't think supporting the watermark lag threshold for each source
can avoid the competition problem. In the case where a user wants to
use a CDC source and also determine backlog status based on watermark
lag, we still need to define the rule when that occurs. With that
said, I think it is more intuitive to combine it with the logical OR
operation, as the strategies (FLIP-309, FLIP-328) only determine when
the source's backlog status should be True. What do you think?

> Besides, this can address another concern that the watermark may be
> generated by DataStream#assignTimestampsAnd
> Watermarks which doesn't
> work with the backlog.watermark-lag-threshold job config

The description of the configuration explicitly states that "a source
would report isProcessingBacklog=true if its watermark lag exceeds the
configured value". It should not confuse the user that
DataStream#assignTimestampsAndWatermarks doesn't work with
backlog.watermark-lag-threshold, as it is not a source.

> Does that mean the job can never back to streaming mode once switches into
> backlog mode? It sounds like not a complete FLIP to me. Is it possible to
> support switching back in this FLIP?

I think the description in the FLIP actually means the other way
around, where the job can never switch back to batch mode once it has
switched into streaming mode. This is to align with the current state
of FLIP-327[1], where only switching from batch to stream mode is
supported.

@Leonard

> > The FLIP describe that: And it should report isProcessingBacklog=false at 
> > the beginning of the snapshot stage.
> This should be “changelog stage”

I think the description is in FLIP-309. Thanks for pointing out. I
updated the description.

> I'm not sure if it's enough to support this feature only in FLIP-27 Source. 
> Although we are pushing the sourceFunction API to be removed, these APIs will 
> be survive one or two versions in flink repo before they are actually removed.

I agree that it is good to support the SourceFunction API. However,
given that the SourceFunction API is marked as deprecated, I think I
will prioritize supporting the FLIP-27 Source. We can support the
SourceFunction API after the
FLIP-27 source. What do you think?

Best regards,
Xuannan

[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-327%3A+Support+switching+from+batch+to+stream+mode+to+improve+throughput+when+processing+backlog+data




On Fri, Sep 8, 2023 at 1:02 AM Leonard Xu  wrote:
>
> Thanks Xuannan for driving this FLIP !
>
> The proposal generally looks good to me, but I still left some comments:
>
> > One more question about the FLIP is that the FLIP says "Note that this
> > config does not support switching source's isProcessingBacklog from false 
> > to true
> > for now.” Does that mean the job can never back to streaming mode once 
> > switches into
> > backlog mode? It sounds like not a complete FLIP to me. Is it possible to
> > support switching back in this FLIP?
> +1 for Jark’s concern, IIUC, the state transition of IsProcessingBacklog 
> depends on whether the data in the source is processing backlog data or not. 
> Different sources will have different backlog status and which may change 
> over time. From a general perspective, we should not have this restriction.
>
> > The FLIP describe that: And it should report isProcessingBacklog=false at 
> > the beginning of the snapshot stage.
> This should be “changelog stage”
>
> I'm not sure if it's enough to support this feature only in FLIP-27 Source. 
> Although we are pushing the sourceFunction API to be removed, these APIs will 
> be survive one or two versions in flink repo before they are actually removed.
>
> Best,
> Leonard
>
> >
> > Best,
> > Jark
> >
> >
> > On Thu, 7 Sept 2023 at 13:51, Xuannan Su  wrote:
> >
> >> Hi all,
> >>
> >> Thank you for all the reviews and suggestions.
> >>
> >> I believe all the comments have been addressed. If there are no
> >> further comments, I plan to open the voting thread for this FLIP early
> >> next week.
> >>
> >> Best regards,
> >> Xuannan
> >>
> >> On Thu, Sep 7, 2023 at 12:09 AM Jing Ge 
> >> wrote:
> >>>
> >>> Hi Xuannan,
> >>>
> >>> I thought FLIP-328 will compete with FLIP-309 while setting the value of
> >>> the backlog. Understood. Thanks for the hint.
> >>>
> >>> Best regards,
> >>> Jing
> >>>
> >>> On Wed, Sep 6, 2023 at 12:12 PM Xuannan Su 
> >> wrote:
> >>>
>  Hi Jing,
> 
>  Thank you for the clarification.
> 
>  For the use case you mentioned, I believe we can utilize the
>  HybridSource, as updated in FLIP-309[1], to determine the backlog
>  status. For example, if the user wants to process data before time T
>  in batch mode and 

Re: [DISCUSS] FLIP-328: Allow source operators to determine isProcessingBacklog based on watermark lag

2023-09-07 Thread Leonard Xu
Thanks Xuannan for driving this FLIP !

The proposal generally looks good to me, but I still left some comments: 

> One more question about the FLIP is that the FLIP says "Note that this
> config does not support switching source's isProcessingBacklog from false to 
> true
> for now.” Does that mean the job can never back to streaming mode once 
> switches into
> backlog mode? It sounds like not a complete FLIP to me. Is it possible to
> support switching back in this FLIP?
+1 for Jark’s concern, IIUC, the state transition of IsProcessingBacklog 
depends on whether the data in the source is processing backlog data or not. 
Different sources will have different backlog status and which may change over 
time. From a general perspective, we should not have this restriction.

> The FLIP describe that: And it should report isProcessingBacklog=false at the 
> beginning of the snapshot stage.
This should be “changelog stage”

I'm not sure if it's enough to support this feature only in FLIP-27 Source. 
Although we are pushing the sourceFunction API to be removed, these APIs will 
be survive one or two versions in flink repo before they are actually removed.

Best,
Leonard

> 
> Best,
> Jark
> 
> 
> On Thu, 7 Sept 2023 at 13:51, Xuannan Su  wrote:
> 
>> Hi all,
>> 
>> Thank you for all the reviews and suggestions.
>> 
>> I believe all the comments have been addressed. If there are no
>> further comments, I plan to open the voting thread for this FLIP early
>> next week.
>> 
>> Best regards,
>> Xuannan
>> 
>> On Thu, Sep 7, 2023 at 12:09 AM Jing Ge 
>> wrote:
>>> 
>>> Hi Xuannan,
>>> 
>>> I thought FLIP-328 will compete with FLIP-309 while setting the value of
>>> the backlog. Understood. Thanks for the hint.
>>> 
>>> Best regards,
>>> Jing
>>> 
>>> On Wed, Sep 6, 2023 at 12:12 PM Xuannan Su 
>> wrote:
>>> 
 Hi Jing,
 
 Thank you for the clarification.
 
 For the use case you mentioned, I believe we can utilize the
 HybridSource, as updated in FLIP-309[1], to determine the backlog
 status. For example, if the user wants to process data before time T
 in batch mode and after time T in stream mode, they can set the first
 source of the HybridSource to read up to time T and the last source of
 the HybridSource to read from time T.
 
 Best,
 Xuannan
 
 [1]
 
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-309%3A+Support+using+larger+checkpointing+interval+when+source+is+processing+backlog
 
 
 On Mon, Sep 4, 2023 at 10:36 PM Jing Ge 
 wrote:
> 
> Hi Xuannan,
> 
> Thanks for the clarification.
> 
> 3. Event time and process time are two different things. It might be
 rarely
> used, but conceptually, users can process data in the past within a
> specific time range in the streaming mode. All data before that range
 will
> be considered as backlog and needed to be processed in the batch
>> mode,
> like, e.g. the Present Perfect Progressive tense used in English
 language.
> 
> Best regards,
> Jing
> 
> On Thu, Aug 31, 2023 at 4:45 AM Xuannan Su 
 wrote:
> 
>> Hi Jing,
>> 
>> Thanks for the reply.
>> 
>> 1. You are absolutely right that the watermark lag threshold must
>> be
>> carefully set with a thorough understanding of watermark
>> generation.
 It is
>> crucial for users to take into account the WatermarkStrategy when
 setting
>> the watermark lag threshold.
>> 
>> 2. Regarding pure processing-time based stream processing jobs,
>> alternative strategies will be implemented to determine whether the
 job is
>> processing backlog data. I have outlined two possible strategies
>> below:
>> 
>> - Based on the source operator's state. For example, when MySQL CDC
 source
>> is reading snapshot, it can claim isBacklog=true.
>> - Based on metrics. For example, when busyTimeMsPerSecond (or
>> backPressuredTimeMsPerSecond) > user_specified_threshold, then
>> isBacklog=true.
>> 
>> As of the strategies proposed in this FLIP, it rely on generated
>> watermarks. Therefore, if a user intends for the job to detect
>> backlog
>> status based on watermark, it is necessary to generate the
>> watermark.
>> 
>> 3. I'm afraid I'm not fully grasping your question. From my
 understanding,
>> it should work in both cases. When event times are close to the
 processing
>> time, resulting in watermarks close to the processing time, the
>> job is
 not
>> processing backlog data. On the other hand, when event times are
>> far
 from
>> processing time, causing watermarks to also be distant, if the lag
>> surpasses the defined threshold, the job is considered processing
 backlog
>> data.
>> 
>> Best,
>> Xuannan
>> 
>> 
>>> On Aug 31, 2023, at 02:56, Jing Ge 
 wrote:
>>> 
>>> Hi Xuannan,
>>> 

Re: [DISCUSS] FLIP-328: Allow source operators to determine isProcessingBacklog based on watermark lag

2023-09-07 Thread Jark Wu
Hi Xuannan,

I think Hang raised a good question about the competition problem between
SplitEnumeratorContext#setIsProcessingBacklog and
backlog.watermark-lag-threshold.
Yes, you can define a rule for the priority, but there still be some people
may misunderstand it  (my gut feeling is it's an overwrite rule).

I think a better API doesn't compete with itself. Therefore, I'm in favor
of
supporting the watermark lag threshold for each source without introducing
any framework API and configuration. So KafkaSourceBuilder can have a
setProcessingBacklogWatermarkLagThreshold method which utilizes
SplitEnumeratorContext#setIsProcessingBacklog underlying. Actually,
this is the approach in the Rejected Alternatives of this FLIP.

Besides, this can address another concern that the watermark may be
generated by DataStream#assignTimestampsAndWatermarks which doesn't
work with the backlog.watermark-lag-threshold job config (only applied on
FLIP-27 SourceOperator). And this unworkable behavior may confuse
users a lot. If we support the watermark lag threshold for each source,
of course only the source with watermarking can have the watermark lag
threshold and this can address this problem.

One more question about the FLIP is that the FLIP says "Note that this
config
does not support switching source's isProcessingBacklog from false to true
for now."
Does that mean the job can never back to streaming mode once switches into
backlog mode? It sounds like not a complete FLIP to me. Is it possible to
support
switching back in this FLIP?

Best,
Jark


On Thu, 7 Sept 2023 at 13:51, Xuannan Su  wrote:

> Hi all,
>
> Thank you for all the reviews and suggestions.
>
> I believe all the comments have been addressed. If there are no
> further comments, I plan to open the voting thread for this FLIP early
> next week.
>
> Best regards,
> Xuannan
>
> On Thu, Sep 7, 2023 at 12:09 AM Jing Ge 
> wrote:
> >
> > Hi Xuannan,
> >
> > I thought FLIP-328 will compete with FLIP-309 while setting the value of
> > the backlog. Understood. Thanks for the hint.
> >
> > Best regards,
> > Jing
> >
> > On Wed, Sep 6, 2023 at 12:12 PM Xuannan Su 
> wrote:
> >
> > > Hi Jing,
> > >
> > > Thank you for the clarification.
> > >
> > > For the use case you mentioned, I believe we can utilize the
> > > HybridSource, as updated in FLIP-309[1], to determine the backlog
> > > status. For example, if the user wants to process data before time T
> > > in batch mode and after time T in stream mode, they can set the first
> > > source of the HybridSource to read up to time T and the last source of
> > > the HybridSource to read from time T.
> > >
> > > Best,
> > > Xuannan
> > >
> > > [1]
> > >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-309%3A+Support+using+larger+checkpointing+interval+when+source+is+processing+backlog
> > >
> > >
> > > On Mon, Sep 4, 2023 at 10:36 PM Jing Ge 
> > > wrote:
> > > >
> > > > Hi Xuannan,
> > > >
> > > > Thanks for the clarification.
> > > >
> > > > 3. Event time and process time are two different things. It might be
> > > rarely
> > > > used, but conceptually, users can process data in the past within a
> > > > specific time range in the streaming mode. All data before that range
> > > will
> > > > be considered as backlog and needed to be processed in the batch
> mode,
> > > > like, e.g. the Present Perfect Progressive tense used in English
> > > language.
> > > >
> > > > Best regards,
> > > > Jing
> > > >
> > > > On Thu, Aug 31, 2023 at 4:45 AM Xuannan Su 
> > > wrote:
> > > >
> > > > > Hi Jing,
> > > > >
> > > > > Thanks for the reply.
> > > > >
> > > > > 1. You are absolutely right that the watermark lag threshold must
> be
> > > > > carefully set with a thorough understanding of watermark
> generation.
> > > It is
> > > > > crucial for users to take into account the WatermarkStrategy when
> > > setting
> > > > > the watermark lag threshold.
> > > > >
> > > > > 2. Regarding pure processing-time based stream processing jobs,
> > > > > alternative strategies will be implemented to determine whether the
> > > job is
> > > > > processing backlog data. I have outlined two possible strategies
> below:
> > > > >
> > > > > - Based on the source operator's state. For example, when MySQL CDC
> > > source
> > > > > is reading snapshot, it can claim isBacklog=true.
> > > > > - Based on metrics. For example, when busyTimeMsPerSecond (or
> > > > > backPressuredTimeMsPerSecond) > user_specified_threshold, then
> > > > > isBacklog=true.
> > > > >
> > > > > As of the strategies proposed in this FLIP, it rely on generated
> > > > > watermarks. Therefore, if a user intends for the job to detect
> backlog
> > > > > status based on watermark, it is necessary to generate the
> watermark.
> > > > >
> > > > > 3. I'm afraid I'm not fully grasping your question. From my
> > > understanding,
> > > > > it should work in both cases. When event times are close to the
> > > processing
> > > > > time, resulting in watermarks close to the 

Re: [DISCUSS] FLIP-328: Allow source operators to determine isProcessingBacklog based on watermark lag

2023-09-06 Thread Xuannan Su
Hi all,

Thank you for all the reviews and suggestions.

I believe all the comments have been addressed. If there are no
further comments, I plan to open the voting thread for this FLIP early
next week.

Best regards,
Xuannan

On Thu, Sep 7, 2023 at 12:09 AM Jing Ge  wrote:
>
> Hi Xuannan,
>
> I thought FLIP-328 will compete with FLIP-309 while setting the value of
> the backlog. Understood. Thanks for the hint.
>
> Best regards,
> Jing
>
> On Wed, Sep 6, 2023 at 12:12 PM Xuannan Su  wrote:
>
> > Hi Jing,
> >
> > Thank you for the clarification.
> >
> > For the use case you mentioned, I believe we can utilize the
> > HybridSource, as updated in FLIP-309[1], to determine the backlog
> > status. For example, if the user wants to process data before time T
> > in batch mode and after time T in stream mode, they can set the first
> > source of the HybridSource to read up to time T and the last source of
> > the HybridSource to read from time T.
> >
> > Best,
> > Xuannan
> >
> > [1]
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-309%3A+Support+using+larger+checkpointing+interval+when+source+is+processing+backlog
> >
> >
> > On Mon, Sep 4, 2023 at 10:36 PM Jing Ge 
> > wrote:
> > >
> > > Hi Xuannan,
> > >
> > > Thanks for the clarification.
> > >
> > > 3. Event time and process time are two different things. It might be
> > rarely
> > > used, but conceptually, users can process data in the past within a
> > > specific time range in the streaming mode. All data before that range
> > will
> > > be considered as backlog and needed to be processed in the batch mode,
> > > like, e.g. the Present Perfect Progressive tense used in English
> > language.
> > >
> > > Best regards,
> > > Jing
> > >
> > > On Thu, Aug 31, 2023 at 4:45 AM Xuannan Su 
> > wrote:
> > >
> > > > Hi Jing,
> > > >
> > > > Thanks for the reply.
> > > >
> > > > 1. You are absolutely right that the watermark lag threshold must be
> > > > carefully set with a thorough understanding of watermark generation.
> > It is
> > > > crucial for users to take into account the WatermarkStrategy when
> > setting
> > > > the watermark lag threshold.
> > > >
> > > > 2. Regarding pure processing-time based stream processing jobs,
> > > > alternative strategies will be implemented to determine whether the
> > job is
> > > > processing backlog data. I have outlined two possible strategies below:
> > > >
> > > > - Based on the source operator's state. For example, when MySQL CDC
> > source
> > > > is reading snapshot, it can claim isBacklog=true.
> > > > - Based on metrics. For example, when busyTimeMsPerSecond (or
> > > > backPressuredTimeMsPerSecond) > user_specified_threshold, then
> > > > isBacklog=true.
> > > >
> > > > As of the strategies proposed in this FLIP, it rely on generated
> > > > watermarks. Therefore, if a user intends for the job to detect backlog
> > > > status based on watermark, it is necessary to generate the watermark.
> > > >
> > > > 3. I'm afraid I'm not fully grasping your question. From my
> > understanding,
> > > > it should work in both cases. When event times are close to the
> > processing
> > > > time, resulting in watermarks close to the processing time, the job is
> > not
> > > > processing backlog data. On the other hand, when event times are far
> > from
> > > > processing time, causing watermarks to also be distant, if the lag
> > > > surpasses the defined threshold, the job is considered processing
> > backlog
> > > > data.
> > > >
> > > > Best,
> > > > Xuannan
> > > >
> > > >
> > > > > On Aug 31, 2023, at 02:56, Jing Ge 
> > wrote:
> > > > >
> > > > > Hi Xuannan,
> > > > >
> > > > > Thanks for the clarification. That is the part where I am trying to
> > > > > understand your thoughts. I have some follow-up questions:
> > > > >
> > > > > 1. It depends strongly on the watermarkStrategy and how customized
> > > > > watermark generation looks like. It mixes business logic with
> > technical
> > > > > implementation and technical data processing mode. The value of the
> > > > > watermark lag threshold must be set very carefully. If the value is
> > too
> > > > > small. any time, when the watermark generation logic is
> > changed(business
> > > > > logic changes lead to the threshold getting exceeded), the same job
> > might
> > > > > be running surprisingly in backlog processing mode, i.e. a butterfly
> > > > > effect. A comprehensive documentation is required to avoid any
> > confusion
> > > > > for the users.
> > > > > 2. Like Jark already mentioned, use cases that do not have
> > watermarks,
> > > > > like pure processing-time based stream processing[1] are not
> > covered. It
> > > > is
> > > > > more or less a trade-off solution that does not support such use
> > cases
> > > > and
> > > > > appropriate documentation is required. Forcing them to explicitly
> > > > generate
> > > > > watermarks that are never needed just because of this does not sound
> > > > like a
> > > > > proper solution.
> > > > > 3. If I am not 

Re: [DISCUSS] FLIP-328: Allow source operators to determine isProcessingBacklog based on watermark lag

2023-09-06 Thread Jing Ge
Hi Xuannan,

I thought FLIP-328 will compete with FLIP-309 while setting the value of
the backlog. Understood. Thanks for the hint.

Best regards,
Jing

On Wed, Sep 6, 2023 at 12:12 PM Xuannan Su  wrote:

> Hi Jing,
>
> Thank you for the clarification.
>
> For the use case you mentioned, I believe we can utilize the
> HybridSource, as updated in FLIP-309[1], to determine the backlog
> status. For example, if the user wants to process data before time T
> in batch mode and after time T in stream mode, they can set the first
> source of the HybridSource to read up to time T and the last source of
> the HybridSource to read from time T.
>
> Best,
> Xuannan
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-309%3A+Support+using+larger+checkpointing+interval+when+source+is+processing+backlog
>
>
> On Mon, Sep 4, 2023 at 10:36 PM Jing Ge 
> wrote:
> >
> > Hi Xuannan,
> >
> > Thanks for the clarification.
> >
> > 3. Event time and process time are two different things. It might be
> rarely
> > used, but conceptually, users can process data in the past within a
> > specific time range in the streaming mode. All data before that range
> will
> > be considered as backlog and needed to be processed in the batch mode,
> > like, e.g. the Present Perfect Progressive tense used in English
> language.
> >
> > Best regards,
> > Jing
> >
> > On Thu, Aug 31, 2023 at 4:45 AM Xuannan Su 
> wrote:
> >
> > > Hi Jing,
> > >
> > > Thanks for the reply.
> > >
> > > 1. You are absolutely right that the watermark lag threshold must be
> > > carefully set with a thorough understanding of watermark generation.
> It is
> > > crucial for users to take into account the WatermarkStrategy when
> setting
> > > the watermark lag threshold.
> > >
> > > 2. Regarding pure processing-time based stream processing jobs,
> > > alternative strategies will be implemented to determine whether the
> job is
> > > processing backlog data. I have outlined two possible strategies below:
> > >
> > > - Based on the source operator's state. For example, when MySQL CDC
> source
> > > is reading snapshot, it can claim isBacklog=true.
> > > - Based on metrics. For example, when busyTimeMsPerSecond (or
> > > backPressuredTimeMsPerSecond) > user_specified_threshold, then
> > > isBacklog=true.
> > >
> > > As of the strategies proposed in this FLIP, it rely on generated
> > > watermarks. Therefore, if a user intends for the job to detect backlog
> > > status based on watermark, it is necessary to generate the watermark.
> > >
> > > 3. I'm afraid I'm not fully grasping your question. From my
> understanding,
> > > it should work in both cases. When event times are close to the
> processing
> > > time, resulting in watermarks close to the processing time, the job is
> not
> > > processing backlog data. On the other hand, when event times are far
> from
> > > processing time, causing watermarks to also be distant, if the lag
> > > surpasses the defined threshold, the job is considered processing
> backlog
> > > data.
> > >
> > > Best,
> > > Xuannan
> > >
> > >
> > > > On Aug 31, 2023, at 02:56, Jing Ge 
> wrote:
> > > >
> > > > Hi Xuannan,
> > > >
> > > > Thanks for the clarification. That is the part where I am trying to
> > > > understand your thoughts. I have some follow-up questions:
> > > >
> > > > 1. It depends strongly on the watermarkStrategy and how customized
> > > > watermark generation looks like. It mixes business logic with
> technical
> > > > implementation and technical data processing mode. The value of the
> > > > watermark lag threshold must be set very carefully. If the value is
> too
> > > > small. any time, when the watermark generation logic is
> changed(business
> > > > logic changes lead to the threshold getting exceeded), the same job
> might
> > > > be running surprisingly in backlog processing mode, i.e. a butterfly
> > > > effect. A comprehensive documentation is required to avoid any
> confusion
> > > > for the users.
> > > > 2. Like Jark already mentioned, use cases that do not have
> watermarks,
> > > > like pure processing-time based stream processing[1] are not
> covered. It
> > > is
> > > > more or less a trade-off solution that does not support such use
> cases
> > > and
> > > > appropriate documentation is required. Forcing them to explicitly
> > > generate
> > > > watermarks that are never needed just because of this does not sound
> > > like a
> > > > proper solution.
> > > > 3. If I am not mistaken, it only works for use cases where event
> times
> > > are
> > > > very close to the processing times, because the wall clock is used to
> > > > calculate the watermark lag and the watermark is generated based on
> the
> > > > event time.
> > > >
> > > > Best regards,
> > > > Jing
> > > >
> > > > [1]
> > > >
> > >
> https://github.com/apache/flink/blob/2c50b4e956305426f478b726d4de4a640a16b810/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkStrategy.java#L236
> > > >
> > > > On Wed, Aug 30, 2023 at 

Re: [DISCUSS] FLIP-328: Allow source operators to determine isProcessingBacklog based on watermark lag

2023-09-06 Thread Xuannan Su
Hi Jing,

Thank you for the clarification.

For the use case you mentioned, I believe we can utilize the
HybridSource, as updated in FLIP-309[1], to determine the backlog
status. For example, if the user wants to process data before time T
in batch mode and after time T in stream mode, they can set the first
source of the HybridSource to read up to time T and the last source of
the HybridSource to read from time T.

Best,
Xuannan

[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-309%3A+Support+using+larger+checkpointing+interval+when+source+is+processing+backlog


On Mon, Sep 4, 2023 at 10:36 PM Jing Ge  wrote:
>
> Hi Xuannan,
>
> Thanks for the clarification.
>
> 3. Event time and process time are two different things. It might be rarely
> used, but conceptually, users can process data in the past within a
> specific time range in the streaming mode. All data before that range will
> be considered as backlog and needed to be processed in the batch mode,
> like, e.g. the Present Perfect Progressive tense used in English language.
>
> Best regards,
> Jing
>
> On Thu, Aug 31, 2023 at 4:45 AM Xuannan Su  wrote:
>
> > Hi Jing,
> >
> > Thanks for the reply.
> >
> > 1. You are absolutely right that the watermark lag threshold must be
> > carefully set with a thorough understanding of watermark generation. It is
> > crucial for users to take into account the WatermarkStrategy when setting
> > the watermark lag threshold.
> >
> > 2. Regarding pure processing-time based stream processing jobs,
> > alternative strategies will be implemented to determine whether the job is
> > processing backlog data. I have outlined two possible strategies below:
> >
> > - Based on the source operator's state. For example, when MySQL CDC source
> > is reading snapshot, it can claim isBacklog=true.
> > - Based on metrics. For example, when busyTimeMsPerSecond (or
> > backPressuredTimeMsPerSecond) > user_specified_threshold, then
> > isBacklog=true.
> >
> > As of the strategies proposed in this FLIP, it rely on generated
> > watermarks. Therefore, if a user intends for the job to detect backlog
> > status based on watermark, it is necessary to generate the watermark.
> >
> > 3. I'm afraid I'm not fully grasping your question. From my understanding,
> > it should work in both cases. When event times are close to the processing
> > time, resulting in watermarks close to the processing time, the job is not
> > processing backlog data. On the other hand, when event times are far from
> > processing time, causing watermarks to also be distant, if the lag
> > surpasses the defined threshold, the job is considered processing backlog
> > data.
> >
> > Best,
> > Xuannan
> >
> >
> > > On Aug 31, 2023, at 02:56, Jing Ge  wrote:
> > >
> > > Hi Xuannan,
> > >
> > > Thanks for the clarification. That is the part where I am trying to
> > > understand your thoughts. I have some follow-up questions:
> > >
> > > 1. It depends strongly on the watermarkStrategy and how customized
> > > watermark generation looks like. It mixes business logic with technical
> > > implementation and technical data processing mode. The value of the
> > > watermark lag threshold must be set very carefully. If the value is too
> > > small. any time, when the watermark generation logic is changed(business
> > > logic changes lead to the threshold getting exceeded), the same job might
> > > be running surprisingly in backlog processing mode, i.e. a butterfly
> > > effect. A comprehensive documentation is required to avoid any confusion
> > > for the users.
> > > 2. Like Jark already mentioned, use cases that do not have watermarks,
> > > like pure processing-time based stream processing[1] are not covered. It
> > is
> > > more or less a trade-off solution that does not support such use cases
> > and
> > > appropriate documentation is required. Forcing them to explicitly
> > generate
> > > watermarks that are never needed just because of this does not sound
> > like a
> > > proper solution.
> > > 3. If I am not mistaken, it only works for use cases where event times
> > are
> > > very close to the processing times, because the wall clock is used to
> > > calculate the watermark lag and the watermark is generated based on the
> > > event time.
> > >
> > > Best regards,
> > > Jing
> > >
> > > [1]
> > >
> > https://github.com/apache/flink/blob/2c50b4e956305426f478b726d4de4a640a16b810/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkStrategy.java#L236
> > >
> > > On Wed, Aug 30, 2023 at 4:06 AM Xuannan Su 
> > wrote:
> > >
> > >> Hi Jing,
> > >>
> > >> Thank you for the suggestion.
> > >>
> > >> The definition of watermark lag is the same as the watermarkLag metric
> > in
> > >> FLIP-33[1]. More specifically, the watermark lag calculation is
> > computed at
> > >> the time when a watermark is emitted downstream in the following way:
> > >> watermarkLag = CurrentTime - Watermark. I have added this description to
> > >> the FLIP.
> > >>
> > >> I hope 

Re: [DISCUSS] FLIP-328: Allow source operators to determine isProcessingBacklog based on watermark lag

2023-09-04 Thread Jing Ge
Hi Xuannan,

Thanks for the clarification.

3. Event time and process time are two different things. It might be rarely
used, but conceptually, users can process data in the past within a
specific time range in the streaming mode. All data before that range will
be considered as backlog and needed to be processed in the batch mode,
like, e.g. the Present Perfect Progressive tense used in English language.

Best regards,
Jing

On Thu, Aug 31, 2023 at 4:45 AM Xuannan Su  wrote:

> Hi Jing,
>
> Thanks for the reply.
>
> 1. You are absolutely right that the watermark lag threshold must be
> carefully set with a thorough understanding of watermark generation. It is
> crucial for users to take into account the WatermarkStrategy when setting
> the watermark lag threshold.
>
> 2. Regarding pure processing-time based stream processing jobs,
> alternative strategies will be implemented to determine whether the job is
> processing backlog data. I have outlined two possible strategies below:
>
> - Based on the source operator's state. For example, when MySQL CDC source
> is reading snapshot, it can claim isBacklog=true.
> - Based on metrics. For example, when busyTimeMsPerSecond (or
> backPressuredTimeMsPerSecond) > user_specified_threshold, then
> isBacklog=true.
>
> As of the strategies proposed in this FLIP, it rely on generated
> watermarks. Therefore, if a user intends for the job to detect backlog
> status based on watermark, it is necessary to generate the watermark.
>
> 3. I'm afraid I'm not fully grasping your question. From my understanding,
> it should work in both cases. When event times are close to the processing
> time, resulting in watermarks close to the processing time, the job is not
> processing backlog data. On the other hand, when event times are far from
> processing time, causing watermarks to also be distant, if the lag
> surpasses the defined threshold, the job is considered processing backlog
> data.
>
> Best,
> Xuannan
>
>
> > On Aug 31, 2023, at 02:56, Jing Ge  wrote:
> >
> > Hi Xuannan,
> >
> > Thanks for the clarification. That is the part where I am trying to
> > understand your thoughts. I have some follow-up questions:
> >
> > 1. It depends strongly on the watermarkStrategy and how customized
> > watermark generation looks like. It mixes business logic with technical
> > implementation and technical data processing mode. The value of the
> > watermark lag threshold must be set very carefully. If the value is too
> > small. any time, when the watermark generation logic is changed(business
> > logic changes lead to the threshold getting exceeded), the same job might
> > be running surprisingly in backlog processing mode, i.e. a butterfly
> > effect. A comprehensive documentation is required to avoid any confusion
> > for the users.
> > 2. Like Jark already mentioned, use cases that do not have watermarks,
> > like pure processing-time based stream processing[1] are not covered. It
> is
> > more or less a trade-off solution that does not support such use cases
> and
> > appropriate documentation is required. Forcing them to explicitly
> generate
> > watermarks that are never needed just because of this does not sound
> like a
> > proper solution.
> > 3. If I am not mistaken, it only works for use cases where event times
> are
> > very close to the processing times, because the wall clock is used to
> > calculate the watermark lag and the watermark is generated based on the
> > event time.
> >
> > Best regards,
> > Jing
> >
> > [1]
> >
> https://github.com/apache/flink/blob/2c50b4e956305426f478b726d4de4a640a16b810/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkStrategy.java#L236
> >
> > On Wed, Aug 30, 2023 at 4:06 AM Xuannan Su 
> wrote:
> >
> >> Hi Jing,
> >>
> >> Thank you for the suggestion.
> >>
> >> The definition of watermark lag is the same as the watermarkLag metric
> in
> >> FLIP-33[1]. More specifically, the watermark lag calculation is
> computed at
> >> the time when a watermark is emitted downstream in the following way:
> >> watermarkLag = CurrentTime - Watermark. I have added this description to
> >> the FLIP.
> >>
> >> I hope this addresses your concern.
> >>
> >> Best,
> >> Xuannan
> >>
> >> [1]
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics
> >>
> >>
> >>> On Aug 28, 2023, at 01:04, Jing Ge  wrote:
> >>>
> >>> Hi Xuannan,
> >>>
> >>> Thanks for the proposal. +1 for me.
> >>>
> >>> There is one tiny thing that I am not sure if I understand it
> correctly.
> >>> Since there will be many different WatermarkStrategies and different
> >>> WatermarkGenerators. Could you please update the FLIP and add the
> >>> description of how the watermark lag is calculated exactly? E.g.
> >> Watermark
> >>> lag = A - B with A is the timestamp of the watermark emitted to the
> >>> downstream and B is(this is the part I am not really sure after
> >> reading
> >>> the FLIP).
> >>>
> >>> Best regards,
> >>> Jing

Re: [DISCUSS] FLIP-328: Allow source operators to determine isProcessingBacklog based on watermark lag

2023-08-31 Thread Xuannan Su
Hi Hang,

Thanks for the review.

This is a good question. It appears that this particular piece of
information is absent from the FLIP document. If I understand
correctly, the term "From the source" refers to a source that
determines the backlog status based on its state. Both the backlog
statuses are determined at the source operator, based on watermark lag
and based on source state. Only when both backlog statuses are false
does the source operator send out the signal "isBacklog=false"
downstream. Essentially, the backlog statuses are combined using a
logical OR operation. I updated the configuration description to
include such behavior.

I hope that addresses your question.

Best,
Xuannan


On Wed, Aug 30, 2023 at 4:31 PM Hang Ruan  wrote:
>
> Hi, Xuannan.
>
> Thanks for preparing the FLIP.
>
> After this FLIP, we will have two ways to report isProcessingBacklog: 1.
> From the source; 2. Judged by the watermark lag. What is the priority
> between them?
> For example, what is the status isProcessingBacklog when the source report
> `isProcessingBacklog=false` and the watermark lag exceeds the threshold?
>
> Best,
> Hang
>
> Xuannan Su  于2023年8月30日周三 10:06写道:
>
> > Hi Jing,
> >
> > Thank you for the suggestion.
> >
> > The definition of watermark lag is the same as the watermarkLag metric in
> > FLIP-33[1]. More specifically, the watermark lag calculation is computed at
> > the time when a watermark is emitted downstream in the following way:
> > watermarkLag = CurrentTime - Watermark. I have added this description to
> > the FLIP.
> >
> > I hope this addresses your concern.
> >
> > Best,
> > Xuannan
> >
> > [1]
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics
> >
> >
> > > On Aug 28, 2023, at 01:04, Jing Ge  wrote:
> > >
> > > Hi Xuannan,
> > >
> > > Thanks for the proposal. +1 for me.
> > >
> > > There is one tiny thing that I am not sure if I understand it correctly.
> > > Since there will be many different WatermarkStrategies and different
> > > WatermarkGenerators. Could you please update the FLIP and add the
> > > description of how the watermark lag is calculated exactly? E.g.
> > Watermark
> > > lag = A - B with A is the timestamp of the watermark emitted to the
> > > downstream and B is(this is the part I am not really sure after
> > reading
> > > the FLIP).
> > >
> > > Best regards,
> > > Jing
> > >
> > >
> > > On Mon, Aug 21, 2023 at 9:03 AM Xuannan Su 
> > wrote:
> > >
> > >> Hi Jark,
> > >>
> > >> Thanks for the comments.
> > >>
> > >> I agree that the current solution cannot support jobs that cannot define
> > >> watermarks. However, after considering the pending-record-based
> > solution, I
> > >> believe the current solution is superior for the target use case as it
> > is
> > >> more intuitive for users. The backlog status gives users the ability to
> > >> balance between throughput and latency. Making this trade-off decision
> > >> based on the watermark lag is more intuitive from the user's
> > perspective.
> > >> For instance, a user can decide that if the job lags behind the current
> > >> time by more than 1 hour, the result is not usable. In that case, we can
> > >> optimize for throughput when the data lags behind by more than an hour.
> > >> With the pending-record-based solution, it's challenging for users to
> > >> determine when to optimize for throughput and when to prioritize
> > latency.
> > >>
> > >> Regarding the limitations of the watermark-based solution:
> > >>
> > >> 1. The current solution can support jobs with sources that have event
> > >> time. Users can always define a watermark at the source operator, even
> > if
> > >> it's not used by downstream operators, such as streaming join and
> > unbounded
> > >> aggregate.
> > >>
> > >> 2.I don't believe it's accurate to say that the watermark lag will keep
> > >> increasing if no data is generated in Kafka. The watermark lag and
> > backlog
> > >> status are determined at the moment when the watermark is emitted to the
> > >> downstream operator. If no data is emitted from the source, the
> > watermark
> > >> lag and backlog status will not be updated. If the WatermarkStrategy
> > with
> > >> idleness is used, the source becomes non-backlog when it becomes idle.
> > >>
> > >> 3. I think watermark lag is more intuitive to determine if a job is
> > >> processing backlog data. Even when using pending records, it faces a
> > >> similar issue. For example, if the source has 1K pending records, those
> > >> records can span from 1 day  to 1 hour to 1 second. If the records span
> > 1
> > >> day, it's probably best to optimize for throughput. If they span 1
> > hour, it
> > >> depends on the business logic. If they span 1 second, optimizing for
> > >> latency is likely the better choice.
> > >>
> > >> In summary, I believe the watermark-based solution is a superior choice
> > >> for the target use case where watermark/event time can be defined.
> > >> Additionally, I haven't 

Re: [DISCUSS] FLIP-328: Allow source operators to determine isProcessingBacklog based on watermark lag

2023-08-30 Thread Xuannan Su
Hi Jing,

Thanks for the reply.

1. You are absolutely right that the watermark lag threshold must be carefully 
set with a thorough understanding of watermark generation. It is crucial for 
users to take into account the WatermarkStrategy when setting the watermark lag 
threshold.

2. Regarding pure processing-time based stream processing jobs, alternative 
strategies will be implemented to determine whether the job is processing 
backlog data. I have outlined two possible strategies below:

- Based on the source operator's state. For example, when MySQL CDC source is 
reading snapshot, it can claim isBacklog=true.
- Based on metrics. For example, when busyTimeMsPerSecond (or 
backPressuredTimeMsPerSecond) > user_specified_threshold, then isBacklog=true.

As of the strategies proposed in this FLIP, it rely on generated watermarks. 
Therefore, if a user intends for the job to detect backlog status based on 
watermark, it is necessary to generate the watermark.

3. I'm afraid I'm not fully grasping your question. From my understanding, it 
should work in both cases. When event times are close to the processing time, 
resulting in watermarks close to the processing time, the job is not processing 
backlog data. On the other hand, when event times are far from processing time, 
causing watermarks to also be distant, if the lag surpasses the defined 
threshold, the job is considered processing backlog data.

Best,
Xuannan


> On Aug 31, 2023, at 02:56, Jing Ge  wrote:
> 
> Hi Xuannan,
> 
> Thanks for the clarification. That is the part where I am trying to
> understand your thoughts. I have some follow-up questions:
> 
> 1. It depends strongly on the watermarkStrategy and how customized
> watermark generation looks like. It mixes business logic with technical
> implementation and technical data processing mode. The value of the
> watermark lag threshold must be set very carefully. If the value is too
> small. any time, when the watermark generation logic is changed(business
> logic changes lead to the threshold getting exceeded), the same job might
> be running surprisingly in backlog processing mode, i.e. a butterfly
> effect. A comprehensive documentation is required to avoid any confusion
> for the users.
> 2. Like Jark already mentioned, use cases that do not have watermarks,
> like pure processing-time based stream processing[1] are not covered. It is
> more or less a trade-off solution that does not support such use cases and
> appropriate documentation is required. Forcing them to explicitly generate
> watermarks that are never needed just because of this does not sound like a
> proper solution.
> 3. If I am not mistaken, it only works for use cases where event times are
> very close to the processing times, because the wall clock is used to
> calculate the watermark lag and the watermark is generated based on the
> event time.
> 
> Best regards,
> Jing
> 
> [1]
> https://github.com/apache/flink/blob/2c50b4e956305426f478b726d4de4a640a16b810/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkStrategy.java#L236
> 
> On Wed, Aug 30, 2023 at 4:06 AM Xuannan Su  wrote:
> 
>> Hi Jing,
>> 
>> Thank you for the suggestion.
>> 
>> The definition of watermark lag is the same as the watermarkLag metric in
>> FLIP-33[1]. More specifically, the watermark lag calculation is computed at
>> the time when a watermark is emitted downstream in the following way:
>> watermarkLag = CurrentTime - Watermark. I have added this description to
>> the FLIP.
>> 
>> I hope this addresses your concern.
>> 
>> Best,
>> Xuannan
>> 
>> [1]
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics
>> 
>> 
>>> On Aug 28, 2023, at 01:04, Jing Ge  wrote:
>>> 
>>> Hi Xuannan,
>>> 
>>> Thanks for the proposal. +1 for me.
>>> 
>>> There is one tiny thing that I am not sure if I understand it correctly.
>>> Since there will be many different WatermarkStrategies and different
>>> WatermarkGenerators. Could you please update the FLIP and add the
>>> description of how the watermark lag is calculated exactly? E.g.
>> Watermark
>>> lag = A - B with A is the timestamp of the watermark emitted to the
>>> downstream and B is(this is the part I am not really sure after
>> reading
>>> the FLIP).
>>> 
>>> Best regards,
>>> Jing
>>> 
>>> 
>>> On Mon, Aug 21, 2023 at 9:03 AM Xuannan Su 
>> wrote:
>>> 
 Hi Jark,
 
 Thanks for the comments.
 
 I agree that the current solution cannot support jobs that cannot define
 watermarks. However, after considering the pending-record-based
>> solution, I
 believe the current solution is superior for the target use case as it
>> is
 more intuitive for users. The backlog status gives users the ability to
 balance between throughput and latency. Making this trade-off decision
 based on the watermark lag is more intuitive from the user's
>> perspective.
 For instance, a user can decide that if the job lags behind the 

Re: [DISCUSS] FLIP-328: Allow source operators to determine isProcessingBacklog based on watermark lag

2023-08-30 Thread Jing Ge
Hi Xuannan,

Thanks for the clarification. That is the part where I am trying to
understand your thoughts. I have some follow-up questions:

1. It depends strongly on the watermarkStrategy and how customized
watermark generation looks like. It mixes business logic with technical
implementation and technical data processing mode. The value of the
watermark lag threshold must be set very carefully. If the value is too
small. any time, when the watermark generation logic is changed(business
logic changes lead to the threshold getting exceeded), the same job might
be running surprisingly in backlog processing mode, i.e. a butterfly
effect. A comprehensive documentation is required to avoid any confusion
for the users.
2. Like Jark already mentioned, use cases that do not have watermarks,
like pure processing-time based stream processing[1] are not covered. It is
more or less a trade-off solution that does not support such use cases and
appropriate documentation is required. Forcing them to explicitly generate
watermarks that are never needed just because of this does not sound like a
proper solution.
3. If I am not mistaken, it only works for use cases where event times are
very close to the processing times, because the wall clock is used to
calculate the watermark lag and the watermark is generated based on the
event time.

Best regards,
Jing

[1]
https://github.com/apache/flink/blob/2c50b4e956305426f478b726d4de4a640a16b810/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkStrategy.java#L236

On Wed, Aug 30, 2023 at 4:06 AM Xuannan Su  wrote:

> Hi Jing,
>
> Thank you for the suggestion.
>
> The definition of watermark lag is the same as the watermarkLag metric in
> FLIP-33[1]. More specifically, the watermark lag calculation is computed at
> the time when a watermark is emitted downstream in the following way:
> watermarkLag = CurrentTime - Watermark. I have added this description to
> the FLIP.
>
> I hope this addresses your concern.
>
> Best,
> Xuannan
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics
>
>
> > On Aug 28, 2023, at 01:04, Jing Ge  wrote:
> >
> > Hi Xuannan,
> >
> > Thanks for the proposal. +1 for me.
> >
> > There is one tiny thing that I am not sure if I understand it correctly.
> > Since there will be many different WatermarkStrategies and different
> > WatermarkGenerators. Could you please update the FLIP and add the
> > description of how the watermark lag is calculated exactly? E.g.
> Watermark
> > lag = A - B with A is the timestamp of the watermark emitted to the
> > downstream and B is(this is the part I am not really sure after
> reading
> > the FLIP).
> >
> > Best regards,
> > Jing
> >
> >
> > On Mon, Aug 21, 2023 at 9:03 AM Xuannan Su 
> wrote:
> >
> >> Hi Jark,
> >>
> >> Thanks for the comments.
> >>
> >> I agree that the current solution cannot support jobs that cannot define
> >> watermarks. However, after considering the pending-record-based
> solution, I
> >> believe the current solution is superior for the target use case as it
> is
> >> more intuitive for users. The backlog status gives users the ability to
> >> balance between throughput and latency. Making this trade-off decision
> >> based on the watermark lag is more intuitive from the user's
> perspective.
> >> For instance, a user can decide that if the job lags behind the current
> >> time by more than 1 hour, the result is not usable. In that case, we can
> >> optimize for throughput when the data lags behind by more than an hour.
> >> With the pending-record-based solution, it's challenging for users to
> >> determine when to optimize for throughput and when to prioritize
> latency.
> >>
> >> Regarding the limitations of the watermark-based solution:
> >>
> >> 1. The current solution can support jobs with sources that have event
> >> time. Users can always define a watermark at the source operator, even
> if
> >> it's not used by downstream operators, such as streaming join and
> unbounded
> >> aggregate.
> >>
> >> 2.I don't believe it's accurate to say that the watermark lag will keep
> >> increasing if no data is generated in Kafka. The watermark lag and
> backlog
> >> status are determined at the moment when the watermark is emitted to the
> >> downstream operator. If no data is emitted from the source, the
> watermark
> >> lag and backlog status will not be updated. If the WatermarkStrategy
> with
> >> idleness is used, the source becomes non-backlog when it becomes idle.
> >>
> >> 3. I think watermark lag is more intuitive to determine if a job is
> >> processing backlog data. Even when using pending records, it faces a
> >> similar issue. For example, if the source has 1K pending records, those
> >> records can span from 1 day  to 1 hour to 1 second. If the records span
> 1
> >> day, it's probably best to optimize for throughput. If they span 1
> hour, it
> >> depends on the business logic. If they span 1 second, optimizing for
> >> 

Re: [DISCUSS] FLIP-328: Allow source operators to determine isProcessingBacklog based on watermark lag

2023-08-30 Thread Hang Ruan
Hi, Xuannan.

Thanks for preparing the FLIP.

After this FLIP, we will have two ways to report isProcessingBacklog: 1.
>From the source; 2. Judged by the watermark lag. What is the priority
between them?
For example, what is the status isProcessingBacklog when the source report
`isProcessingBacklog=false` and the watermark lag exceeds the threshold?

Best,
Hang

Xuannan Su  于2023年8月30日周三 10:06写道:

> Hi Jing,
>
> Thank you for the suggestion.
>
> The definition of watermark lag is the same as the watermarkLag metric in
> FLIP-33[1]. More specifically, the watermark lag calculation is computed at
> the time when a watermark is emitted downstream in the following way:
> watermarkLag = CurrentTime - Watermark. I have added this description to
> the FLIP.
>
> I hope this addresses your concern.
>
> Best,
> Xuannan
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics
>
>
> > On Aug 28, 2023, at 01:04, Jing Ge  wrote:
> >
> > Hi Xuannan,
> >
> > Thanks for the proposal. +1 for me.
> >
> > There is one tiny thing that I am not sure if I understand it correctly.
> > Since there will be many different WatermarkStrategies and different
> > WatermarkGenerators. Could you please update the FLIP and add the
> > description of how the watermark lag is calculated exactly? E.g.
> Watermark
> > lag = A - B with A is the timestamp of the watermark emitted to the
> > downstream and B is(this is the part I am not really sure after
> reading
> > the FLIP).
> >
> > Best regards,
> > Jing
> >
> >
> > On Mon, Aug 21, 2023 at 9:03 AM Xuannan Su 
> wrote:
> >
> >> Hi Jark,
> >>
> >> Thanks for the comments.
> >>
> >> I agree that the current solution cannot support jobs that cannot define
> >> watermarks. However, after considering the pending-record-based
> solution, I
> >> believe the current solution is superior for the target use case as it
> is
> >> more intuitive for users. The backlog status gives users the ability to
> >> balance between throughput and latency. Making this trade-off decision
> >> based on the watermark lag is more intuitive from the user's
> perspective.
> >> For instance, a user can decide that if the job lags behind the current
> >> time by more than 1 hour, the result is not usable. In that case, we can
> >> optimize for throughput when the data lags behind by more than an hour.
> >> With the pending-record-based solution, it's challenging for users to
> >> determine when to optimize for throughput and when to prioritize
> latency.
> >>
> >> Regarding the limitations of the watermark-based solution:
> >>
> >> 1. The current solution can support jobs with sources that have event
> >> time. Users can always define a watermark at the source operator, even
> if
> >> it's not used by downstream operators, such as streaming join and
> unbounded
> >> aggregate.
> >>
> >> 2.I don't believe it's accurate to say that the watermark lag will keep
> >> increasing if no data is generated in Kafka. The watermark lag and
> backlog
> >> status are determined at the moment when the watermark is emitted to the
> >> downstream operator. If no data is emitted from the source, the
> watermark
> >> lag and backlog status will not be updated. If the WatermarkStrategy
> with
> >> idleness is used, the source becomes non-backlog when it becomes idle.
> >>
> >> 3. I think watermark lag is more intuitive to determine if a job is
> >> processing backlog data. Even when using pending records, it faces a
> >> similar issue. For example, if the source has 1K pending records, those
> >> records can span from 1 day  to 1 hour to 1 second. If the records span
> 1
> >> day, it's probably best to optimize for throughput. If they span 1
> hour, it
> >> depends on the business logic. If they span 1 second, optimizing for
> >> latency is likely the better choice.
> >>
> >> In summary, I believe the watermark-based solution is a superior choice
> >> for the target use case where watermark/event time can be defined.
> >> Additionally, I haven't come across a scenario that requires low-latency
> >> processing and reads from a source that cannot define watermarks. If we
> >> encounter such a use case, we can create another FLIP to address those
> >> needs in the future. What do you think?
> >>
> >>
> >> Best,
> >> Xuannan
> >>
> >>
> >>
> >>> On Aug 20, 2023, at 23:27, Jark Wu  >> imj...@gmail.com>> wrote:
> >>>
> >>> Hi Xuannan,
> >>>
> >>> Thanks for opening this discussion.
> >>>
> >>> This current proposal may work in the mentioned watermark cases.
> >>> However, it seems this is not a general solution for sources to
> determine
> >>> "isProcessingBacklog".
> >>> From my point of view, there are 3 limitations of the current proposal:
> >>> 1. It doesn't cover jobs that don't have watermark/event-time defined,
> >>> for example streaming join and unbounded aggregate. We may still need
> to
> >>> figure out solutions for them.
> >>> 2. Watermark lag can not be trusted, because it increases 

Re: [DISCUSS] FLIP-328: Allow source operators to determine isProcessingBacklog based on watermark lag

2023-08-29 Thread Xuannan Su
Hi Jing,

Thank you for the suggestion.

The definition of watermark lag is the same as the watermarkLag metric in 
FLIP-33[1]. More specifically, the watermark lag calculation is computed at the 
time when a watermark is emitted downstream in the following way: watermarkLag 
= CurrentTime - Watermark. I have added this description to the FLIP.

I hope this addresses your concern.

Best, 
Xuannan

[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics


> On Aug 28, 2023, at 01:04, Jing Ge  wrote:
> 
> Hi Xuannan,
> 
> Thanks for the proposal. +1 for me.
> 
> There is one tiny thing that I am not sure if I understand it correctly.
> Since there will be many different WatermarkStrategies and different
> WatermarkGenerators. Could you please update the FLIP and add the
> description of how the watermark lag is calculated exactly? E.g. Watermark
> lag = A - B with A is the timestamp of the watermark emitted to the
> downstream and B is(this is the part I am not really sure after reading
> the FLIP).
> 
> Best regards,
> Jing
> 
> 
> On Mon, Aug 21, 2023 at 9:03 AM Xuannan Su  wrote:
> 
>> Hi Jark,
>> 
>> Thanks for the comments.
>> 
>> I agree that the current solution cannot support jobs that cannot define
>> watermarks. However, after considering the pending-record-based solution, I
>> believe the current solution is superior for the target use case as it is
>> more intuitive for users. The backlog status gives users the ability to
>> balance between throughput and latency. Making this trade-off decision
>> based on the watermark lag is more intuitive from the user's perspective.
>> For instance, a user can decide that if the job lags behind the current
>> time by more than 1 hour, the result is not usable. In that case, we can
>> optimize for throughput when the data lags behind by more than an hour.
>> With the pending-record-based solution, it's challenging for users to
>> determine when to optimize for throughput and when to prioritize latency.
>> 
>> Regarding the limitations of the watermark-based solution:
>> 
>> 1. The current solution can support jobs with sources that have event
>> time. Users can always define a watermark at the source operator, even if
>> it's not used by downstream operators, such as streaming join and unbounded
>> aggregate.
>> 
>> 2.I don't believe it's accurate to say that the watermark lag will keep
>> increasing if no data is generated in Kafka. The watermark lag and backlog
>> status are determined at the moment when the watermark is emitted to the
>> downstream operator. If no data is emitted from the source, the watermark
>> lag and backlog status will not be updated. If the WatermarkStrategy with
>> idleness is used, the source becomes non-backlog when it becomes idle.
>> 
>> 3. I think watermark lag is more intuitive to determine if a job is
>> processing backlog data. Even when using pending records, it faces a
>> similar issue. For example, if the source has 1K pending records, those
>> records can span from 1 day  to 1 hour to 1 second. If the records span 1
>> day, it's probably best to optimize for throughput. If they span 1 hour, it
>> depends on the business logic. If they span 1 second, optimizing for
>> latency is likely the better choice.
>> 
>> In summary, I believe the watermark-based solution is a superior choice
>> for the target use case where watermark/event time can be defined.
>> Additionally, I haven't come across a scenario that requires low-latency
>> processing and reads from a source that cannot define watermarks. If we
>> encounter such a use case, we can create another FLIP to address those
>> needs in the future. What do you think?
>> 
>> 
>> Best,
>> Xuannan
>> 
>> 
>> 
>>> On Aug 20, 2023, at 23:27, Jark Wu > imj...@gmail.com>> wrote:
>>> 
>>> Hi Xuannan,
>>> 
>>> Thanks for opening this discussion.
>>> 
>>> This current proposal may work in the mentioned watermark cases.
>>> However, it seems this is not a general solution for sources to determine
>>> "isProcessingBacklog".
>>> From my point of view, there are 3 limitations of the current proposal:
>>> 1. It doesn't cover jobs that don't have watermark/event-time defined,
>>> for example streaming join and unbounded aggregate. We may still need to
>>> figure out solutions for them.
>>> 2. Watermark lag can not be trusted, because it increases unlimited if no
>>> data is generated in the Kafka.
>>> But in this case, there is no backlog at all.
>>> 3. Watermark lag is hard to reflect the amount of backlog. If the
>> watermark
>>> lag is 1day or 1 hour or 1second,
>>> there is possibly only 1 pending record there, which means no backlog at
>>> all.
>>> 
>>> Therefore, IMO, watermark maybe not the ideal metric used to determine
>>> "isProcessingBacklog".
>>> What we need is something that reflects the number of records unprocessed
>>> by the job.
>>> Actually, that is the "pendingRecords" metric proposed in FLIP-33 and has
>>> been implemented by 

Re: [DISCUSS] FLIP-328: Allow source operators to determine isProcessingBacklog based on watermark lag

2023-08-27 Thread Jing Ge
Hi Xuannan,

Thanks for the proposal. +1 for me.

There is one tiny thing that I am not sure if I understand it correctly.
Since there will be many different WatermarkStrategies and different
WatermarkGenerators. Could you please update the FLIP and add the
description of how the watermark lag is calculated exactly? E.g. Watermark
lag = A - B with A is the timestamp of the watermark emitted to the
downstream and B is(this is the part I am not really sure after reading
the FLIP).

Best regards,
Jing


On Mon, Aug 21, 2023 at 9:03 AM Xuannan Su  wrote:

> Hi Jark,
>
> Thanks for the comments.
>
> I agree that the current solution cannot support jobs that cannot define
> watermarks. However, after considering the pending-record-based solution, I
> believe the current solution is superior for the target use case as it is
> more intuitive for users. The backlog status gives users the ability to
> balance between throughput and latency. Making this trade-off decision
> based on the watermark lag is more intuitive from the user's perspective.
> For instance, a user can decide that if the job lags behind the current
> time by more than 1 hour, the result is not usable. In that case, we can
> optimize for throughput when the data lags behind by more than an hour.
> With the pending-record-based solution, it's challenging for users to
> determine when to optimize for throughput and when to prioritize latency.
>
> Regarding the limitations of the watermark-based solution:
>
> 1. The current solution can support jobs with sources that have event
> time. Users can always define a watermark at the source operator, even if
> it's not used by downstream operators, such as streaming join and unbounded
> aggregate.
>
> 2.I don't believe it's accurate to say that the watermark lag will keep
> increasing if no data is generated in Kafka. The watermark lag and backlog
> status are determined at the moment when the watermark is emitted to the
> downstream operator. If no data is emitted from the source, the watermark
> lag and backlog status will not be updated. If the WatermarkStrategy with
> idleness is used, the source becomes non-backlog when it becomes idle.
>
> 3. I think watermark lag is more intuitive to determine if a job is
> processing backlog data. Even when using pending records, it faces a
> similar issue. For example, if the source has 1K pending records, those
> records can span from 1 day  to 1 hour to 1 second. If the records span 1
> day, it's probably best to optimize for throughput. If they span 1 hour, it
> depends on the business logic. If they span 1 second, optimizing for
> latency is likely the better choice.
>
> In summary, I believe the watermark-based solution is a superior choice
> for the target use case where watermark/event time can be defined.
> Additionally, I haven't come across a scenario that requires low-latency
> processing and reads from a source that cannot define watermarks. If we
> encounter such a use case, we can create another FLIP to address those
> needs in the future. What do you think?
>
>
> Best,
> Xuannan
>
>
>
> > On Aug 20, 2023, at 23:27, Jark Wu  imj...@gmail.com>> wrote:
> >
> > Hi Xuannan,
> >
> > Thanks for opening this discussion.
> >
> > This current proposal may work in the mentioned watermark cases.
> > However, it seems this is not a general solution for sources to determine
> > "isProcessingBacklog".
> > From my point of view, there are 3 limitations of the current proposal:
> > 1. It doesn't cover jobs that don't have watermark/event-time defined,
> > for example streaming join and unbounded aggregate. We may still need to
> > figure out solutions for them.
> > 2. Watermark lag can not be trusted, because it increases unlimited if no
> > data is generated in the Kafka.
> > But in this case, there is no backlog at all.
> > 3. Watermark lag is hard to reflect the amount of backlog. If the
> watermark
> > lag is 1day or 1 hour or 1second,
> > there is possibly only 1 pending record there, which means no backlog at
> > all.
> >
> > Therefore, IMO, watermark maybe not the ideal metric used to determine
> > "isProcessingBacklog".
> > What we need is something that reflects the number of records unprocessed
> > by the job.
> > Actually, that is the "pendingRecords" metric proposed in FLIP-33 and has
> > been implemented by Kafka source.
> > Did you consider using "pendingRecords" metric to determine
> > "isProcessingBacklog"?
> >
> > Best,
> > Jark
> >
> >
> > [1]
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics
> <
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics
> >
> >
> >
> >
> > On Tue, 15 Aug 2023 at 12:04, Xintong Song  > wrote:
> >
> >> Sounds good to me.
> >>
> >> It is true that, if we are introducing the generalized watermark, there
> >> will be other watermark related concepts / configurations that need to
> be
> >> updated anyway.

Re: [DISCUSS] FLIP-328: Allow source operators to determine isProcessingBacklog based on watermark lag

2023-08-21 Thread Xuannan Su
Hi Jark,

Thanks for the comments.

I agree that the current solution cannot support jobs that cannot define 
watermarks. However, after considering the pending-record-based solution, I 
believe the current solution is superior for the target use case as it is more 
intuitive for users. The backlog status gives users the ability to balance 
between throughput and latency. Making this trade-off decision based on the 
watermark lag is more intuitive from the user's perspective. For instance, a 
user can decide that if the job lags behind the current time by more than 1 
hour, the result is not usable. In that case, we can optimize for throughput 
when the data lags behind by more than an hour. With the pending-record-based 
solution, it's challenging for users to determine when to optimize for 
throughput and when to prioritize latency.

Regarding the limitations of the watermark-based solution:

1. The current solution can support jobs with sources that have event time. 
Users can always define a watermark at the source operator, even if it's not 
used by downstream operators, such as streaming join and unbounded aggregate.

2.I don't believe it's accurate to say that the watermark lag will keep 
increasing if no data is generated in Kafka. The watermark lag and backlog 
status are determined at the moment when the watermark is emitted to the 
downstream operator. If no data is emitted from the source, the watermark lag 
and backlog status will not be updated. If the WatermarkStrategy with idleness 
is used, the source becomes non-backlog when it becomes idle.

3. I think watermark lag is more intuitive to determine if a job is processing 
backlog data. Even when using pending records, it faces a similar issue. For 
example, if the source has 1K pending records, those records can span from 1 
day  to 1 hour to 1 second. If the records span 1 day, it's probably best to 
optimize for throughput. If they span 1 hour, it depends on the business logic. 
If they span 1 second, optimizing for latency is likely the better choice.

In summary, I believe the watermark-based solution is a superior choice for the 
target use case where watermark/event time can be defined. Additionally, I 
haven't come across a scenario that requires low-latency processing and reads 
from a source that cannot define watermarks. If we encounter such a use case, 
we can create another FLIP to address those needs in the future. What do you 
think?


Best,
Xuannan



> On Aug 20, 2023, at 23:27, Jark Wu  > wrote:
> 
> Hi Xuannan,
> 
> Thanks for opening this discussion.
> 
> This current proposal may work in the mentioned watermark cases.
> However, it seems this is not a general solution for sources to determine
> "isProcessingBacklog".
> From my point of view, there are 3 limitations of the current proposal:
> 1. It doesn't cover jobs that don't have watermark/event-time defined,
> for example streaming join and unbounded aggregate. We may still need to
> figure out solutions for them.
> 2. Watermark lag can not be trusted, because it increases unlimited if no
> data is generated in the Kafka.
> But in this case, there is no backlog at all.
> 3. Watermark lag is hard to reflect the amount of backlog. If the watermark
> lag is 1day or 1 hour or 1second,
> there is possibly only 1 pending record there, which means no backlog at
> all.
> 
> Therefore, IMO, watermark maybe not the ideal metric used to determine
> "isProcessingBacklog".
> What we need is something that reflects the number of records unprocessed
> by the job.
> Actually, that is the "pendingRecords" metric proposed in FLIP-33 and has
> been implemented by Kafka source.
> Did you consider using "pendingRecords" metric to determine
> "isProcessingBacklog"?
> 
> Best,
> Jark
> 
> 
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics
>  
> 
> 
> 
> 
> On Tue, 15 Aug 2023 at 12:04, Xintong Song  > wrote:
> 
>> Sounds good to me.
>> 
>> It is true that, if we are introducing the generalized watermark, there
>> will be other watermark related concepts / configurations that need to be
>> updated anyway.
>> 
>> 
>> Best,
>> 
>> Xintong
>> 
>> 
>> 
>> On Tue, Aug 15, 2023 at 11:30 AM Xuannan Su > > wrote:
>> 
>>> Hi Xingtong,
>>> 
>>> Thank you for your suggestion.
>>> 
>>> After considering the idea of using a general configuration key, I think
>>> it may not be a good idea for the reasons below.
>>> 
>>> While I agree that using a more general configuration key provides us
>> with
>>> the flexibility to switch to other approaches to calculate the lag in the
>>> future, the downside is that it may cause confusion for users. We
>> currently
>>> have fetchEventTimeLag, emitEventTimeLag, and watermarkLag in the source,
>>> and it is not clear which specific lag we are 

Re: [DISCUSS] FLIP-328: Allow source operators to determine isProcessingBacklog based on watermark lag

2023-08-20 Thread Jark Wu
Hi Xuannan,

Thanks for opening this discussion.

This current proposal may work in the mentioned watermark cases.
However, it seems this is not a general solution for sources to determine
"isProcessingBacklog".
>From my point of view, there are 3 limitations of the current proposal:
1. It doesn't cover jobs that don't have watermark/event-time defined,
for example streaming join and unbounded aggregate. We may still need to
figure out solutions for them.
2. Watermark lag can not be trusted, because it increases unlimited if no
data is generated in the Kafka.
But in this case, there is no backlog at all.
3. Watermark lag is hard to reflect the amount of backlog. If the watermark
lag is 1day or 1 hour or 1second,
there is possibly only 1 pending record there, which means no backlog at
all.

Therefore, IMO, watermark maybe not the ideal metric used to determine
"isProcessingBacklog".
What we need is something that reflects the number of records unprocessed
by the job.
Actually, that is the "pendingRecords" metric proposed in FLIP-33 and has
been implemented by Kafka source.
Did you consider using "pendingRecords" metric to determine
"isProcessingBacklog"?

Best,
Jark


[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics



On Tue, 15 Aug 2023 at 12:04, Xintong Song  wrote:

> Sounds good to me.
>
> It is true that, if we are introducing the generalized watermark, there
> will be other watermark related concepts / configurations that need to be
> updated anyway.
>
>
> Best,
>
> Xintong
>
>
>
> On Tue, Aug 15, 2023 at 11:30 AM Xuannan Su  wrote:
>
> > Hi Xingtong,
> >
> > Thank you for your suggestion.
> >
> > After considering the idea of using a general configuration key, I think
> > it may not be a good idea for the reasons below.
> >
> > While I agree that using a more general configuration key provides us
> with
> > the flexibility to switch to other approaches to calculate the lag in the
> > future, the downside is that it may cause confusion for users. We
> currently
> > have fetchEventTimeLag, emitEventTimeLag, and watermarkLag in the source,
> > and it is not clear which specific lag we are referring to. With the
> > potential introduction of the Generalized Watermark mechanism in the
> > future, if I understand correctly, a watermark won't necessarily need to
> be
> > a timestamp. I am concern that the general configuration key may not  be
> > enough to cover all the use case and we will need to introduce a general
> > way to determine the backlog status regardless.
> >
> > For the reasons above, I prefer introducing the configuration as is, and
> > change it later with the a deprecation process or migration process. What
> > do you think?
> >
> > Best,
> > Xuannan
> > On Aug 14, 2023, 14:09 +0800, Xintong Song ,
> wrote:
> > > Thanks for the explanation.
> > >
> > > I wonder if it makes sense to not expose this detail via the
> > configuration
> > > option. To be specific, I suggest not mentioning the "watermark"
> keyword
> > in
> > > the configuration key and description.
> > >
> > > - From the users' perspective, I think they only need to know there's a
> > > lag higher than the given threshold, Flink will consider latency of
> > > individual records as less important and prioritize throughput over it.
> > > They don't really need the details of how the lags are calculated.
> > > - For the internal implementation, I also think using watermark lags is
> > > a good idea, for the reasons you've already mentioned. However, it's
> not
> > > the only possible option. Hiding this detail from users would give us
> the
> > > flexibility to switch to other approaches if needed in future.
> > > - We are currently working on designing the ProcessFunction API
> > > (consider it as a DataStream API V2). There's an idea to introduce a
> > > Generalized Watermark mechanism, where basically the watermark can be
> > > anything that needs to travel along the data-flow with certain
> alignment
> > > strategies, and event time watermark would be one specific case of it.
> > This
> > > is still an idea and has not been discussed and agreed on by the
> > community,
> > > and we are preparing a FLIP for it. But if we are going for it, the
> > concept
> > > "watermark-lag-threshold" could be ambiguous.
> > >
> > > I do not intend to block the FLIP on this. I'd also be fine with
> > > introducing the configuration as is, and changing it later, if needed,
> > with
> > > a regular deprecation and migration process. Just making my
> suggestions.
> > >
> > >
> > > Best,
> > >
> > > Xintong
> > >
> > >
> > >
> > > On Mon, Aug 14, 2023 at 12:00 PM Xuannan Su 
> > wrote:
> > >
> > > > Hi Xintong,
> > > >
> > > > Thanks for the reply.
> > > >
> > > > I have considered using the timestamp in the records to determine the
> > > > backlog status, and decided to use watermark at the end. By
> definition,
> > > > watermark is the time progress indication in the data stream. It
> > indicates
> 

Re: [DISCUSS] FLIP-328: Allow source operators to determine isProcessingBacklog based on watermark lag

2023-08-14 Thread Xintong Song
Sounds good to me.

It is true that, if we are introducing the generalized watermark, there
will be other watermark related concepts / configurations that need to be
updated anyway.


Best,

Xintong



On Tue, Aug 15, 2023 at 11:30 AM Xuannan Su  wrote:

> Hi Xingtong,
>
> Thank you for your suggestion.
>
> After considering the idea of using a general configuration key, I think
> it may not be a good idea for the reasons below.
>
> While I agree that using a more general configuration key provides us with
> the flexibility to switch to other approaches to calculate the lag in the
> future, the downside is that it may cause confusion for users. We currently
> have fetchEventTimeLag, emitEventTimeLag, and watermarkLag in the source,
> and it is not clear which specific lag we are referring to. With the
> potential introduction of the Generalized Watermark mechanism in the
> future, if I understand correctly, a watermark won't necessarily need to be
> a timestamp. I am concern that the general configuration key may not  be
> enough to cover all the use case and we will need to introduce a general
> way to determine the backlog status regardless.
>
> For the reasons above, I prefer introducing the configuration as is, and
> change it later with the a deprecation process or migration process. What
> do you think?
>
> Best,
> Xuannan
> On Aug 14, 2023, 14:09 +0800, Xintong Song , wrote:
> > Thanks for the explanation.
> >
> > I wonder if it makes sense to not expose this detail via the
> configuration
> > option. To be specific, I suggest not mentioning the "watermark" keyword
> in
> > the configuration key and description.
> >
> > - From the users' perspective, I think they only need to know there's a
> > lag higher than the given threshold, Flink will consider latency of
> > individual records as less important and prioritize throughput over it.
> > They don't really need the details of how the lags are calculated.
> > - For the internal implementation, I also think using watermark lags is
> > a good idea, for the reasons you've already mentioned. However, it's not
> > the only possible option. Hiding this detail from users would give us the
> > flexibility to switch to other approaches if needed in future.
> > - We are currently working on designing the ProcessFunction API
> > (consider it as a DataStream API V2). There's an idea to introduce a
> > Generalized Watermark mechanism, where basically the watermark can be
> > anything that needs to travel along the data-flow with certain alignment
> > strategies, and event time watermark would be one specific case of it.
> This
> > is still an idea and has not been discussed and agreed on by the
> community,
> > and we are preparing a FLIP for it. But if we are going for it, the
> concept
> > "watermark-lag-threshold" could be ambiguous.
> >
> > I do not intend to block the FLIP on this. I'd also be fine with
> > introducing the configuration as is, and changing it later, if needed,
> with
> > a regular deprecation and migration process. Just making my suggestions.
> >
> >
> > Best,
> >
> > Xintong
> >
> >
> >
> > On Mon, Aug 14, 2023 at 12:00 PM Xuannan Su 
> wrote:
> >
> > > Hi Xintong,
> > >
> > > Thanks for the reply.
> > >
> > > I have considered using the timestamp in the records to determine the
> > > backlog status, and decided to use watermark at the end. By definition,
> > > watermark is the time progress indication in the data stream. It
> indicates
> > > the stream’s event time has progressed to some specific time. On the
> other
> > > hand, timestamp in the records is usually used to generate the
> watermark.
> > > Therefore, it appears more appropriate and intuitive to calculate the
> event
> > > time lag by watermark and determine the backlog status. And by using
> the
> > > watermark, we can easily deal with the out-of-order and the idleness
> of the
> > > data.
> > >
> > > Please let me know if you have further questions.
> > >
> > > Best,
> > > Xuannan
> > > On Aug 10, 2023, 20:23 +0800, Xintong Song ,
> wrote:
> > > > Thanks for preparing the FLIP, Xuannan.
> > > >
> > > > +1 in general.
> > > >
> > > > A quick question, could you explain why we are relying on the
> watermark
> > > for
> > > > emitting the record attribute? Why not use timestamps in the
> records? I
> > > > don't see any concern in using watermarks. Just wondering if there's
> any
> > > > deep considerations behind this.
> > > >
> > > > Best,
> > > >
> > > > Xintong
> > > >
> > > >
> > > >
> > > > On Thu, Aug 3, 2023 at 3:03 PM Xuannan Su 
> wrote:
> > > >
> > > > > Hi all,
> > > > >
> > > > > I am opening this thread to discuss FLIP-328: Allow source
> operators to
> > > > > determine isProcessingBacklog based on watermark lag[1]. We had a
> > > several
> > > > > discussions with Dong Ling about the design, and thanks for all the
> > > > > valuable advice.
> > > > >
> > > > > The FLIP aims to target the use-case where user want to run a Flink
> > > job to
> > > > > backfill 

Re: [DISCUSS] FLIP-328: Allow source operators to determine isProcessingBacklog based on watermark lag

2023-08-14 Thread Xuannan Su
Hi Xingtong,

Thank you for your suggestion.

After considering the idea of using a general configuration key, I think it may 
not be a good idea for the reasons below.

While I agree that using a more general configuration key provides us with the 
flexibility to switch to other approaches to calculate the lag in the future, 
the downside is that it may cause confusion for users. We currently have 
fetchEventTimeLag, emitEventTimeLag, and watermarkLag in the source, and it is 
not clear which specific lag we are referring to. With the potential 
introduction of the Generalized Watermark mechanism in the future, if I 
understand correctly, a watermark won't necessarily need to be a timestamp. I 
am concern that the general configuration key may not  be enough to cover all 
the use case and we will need to introduce a general way to determine the 
backlog status regardless.

For the reasons above, I prefer introducing the configuration as is, and change 
it later with the a deprecation process or migration process. What do you think?

Best,
Xuannan
On Aug 14, 2023, 14:09 +0800, Xintong Song , wrote:
> Thanks for the explanation.
>
> I wonder if it makes sense to not expose this detail via the configuration
> option. To be specific, I suggest not mentioning the "watermark" keyword in
> the configuration key and description.
>
> - From the users' perspective, I think they only need to know there's a
> lag higher than the given threshold, Flink will consider latency of
> individual records as less important and prioritize throughput over it.
> They don't really need the details of how the lags are calculated.
> - For the internal implementation, I also think using watermark lags is
> a good idea, for the reasons you've already mentioned. However, it's not
> the only possible option. Hiding this detail from users would give us the
> flexibility to switch to other approaches if needed in future.
> - We are currently working on designing the ProcessFunction API
> (consider it as a DataStream API V2). There's an idea to introduce a
> Generalized Watermark mechanism, where basically the watermark can be
> anything that needs to travel along the data-flow with certain alignment
> strategies, and event time watermark would be one specific case of it. This
> is still an idea and has not been discussed and agreed on by the community,
> and we are preparing a FLIP for it. But if we are going for it, the concept
> "watermark-lag-threshold" could be ambiguous.
>
> I do not intend to block the FLIP on this. I'd also be fine with
> introducing the configuration as is, and changing it later, if needed, with
> a regular deprecation and migration process. Just making my suggestions.
>
>
> Best,
>
> Xintong
>
>
>
> On Mon, Aug 14, 2023 at 12:00 PM Xuannan Su  wrote:
>
> > Hi Xintong,
> >
> > Thanks for the reply.
> >
> > I have considered using the timestamp in the records to determine the
> > backlog status, and decided to use watermark at the end. By definition,
> > watermark is the time progress indication in the data stream. It indicates
> > the stream’s event time has progressed to some specific time. On the other
> > hand, timestamp in the records is usually used to generate the watermark.
> > Therefore, it appears more appropriate and intuitive to calculate the event
> > time lag by watermark and determine the backlog status. And by using the
> > watermark, we can easily deal with the out-of-order and the idleness of the
> > data.
> >
> > Please let me know if you have further questions.
> >
> > Best,
> > Xuannan
> > On Aug 10, 2023, 20:23 +0800, Xintong Song , wrote:
> > > Thanks for preparing the FLIP, Xuannan.
> > >
> > > +1 in general.
> > >
> > > A quick question, could you explain why we are relying on the watermark
> > for
> > > emitting the record attribute? Why not use timestamps in the records? I
> > > don't see any concern in using watermarks. Just wondering if there's any
> > > deep considerations behind this.
> > >
> > > Best,
> > >
> > > Xintong
> > >
> > >
> > >
> > > On Thu, Aug 3, 2023 at 3:03 PM Xuannan Su  wrote:
> > >
> > > > Hi all,
> > > >
> > > > I am opening this thread to discuss FLIP-328: Allow source operators to
> > > > determine isProcessingBacklog based on watermark lag[1]. We had a
> > several
> > > > discussions with Dong Ling about the design, and thanks for all the
> > > > valuable advice.
> > > >
> > > > The FLIP aims to target the use-case where user want to run a Flink
> > job to
> > > > backfill historical data in a high throughput manner and continue
> > > > processing real-time data with low latency. Building upon the backlog
> > > > concept introduced in FLIP-309[2], this proposal enables sources to
> > report
> > > > their status of processing backlog based on the watermark lag.
> > > >
> > > > We would greatly appreciate any comments or feedback you may have on
> > this
> > > > proposal.
> > > >
> > > > Best,
> > > > Xuannan
> > > >
> > > >
> > > > [1]
> > > >
> > 

Re: [DISCUSS] FLIP-328: Allow source operators to determine isProcessingBacklog based on watermark lag

2023-08-14 Thread Xintong Song
Thanks for the explanation.

I wonder if it makes sense to not expose this detail via the configuration
option. To be specific, I suggest not mentioning the "watermark" keyword in
the configuration key and description.

   - From the users' perspective, I think they only need to know there's a
   lag higher than the given threshold, Flink will consider latency of
   individual records as less important and prioritize throughput over it.
   They don't really need the details of how the lags are calculated.
   - For the internal implementation, I also think using watermark lags is
   a good idea, for the reasons you've already mentioned. However, it's not
   the only possible option. Hiding this detail from users would give us the
   flexibility to switch to other approaches if needed in future.
   - We are currently working on designing the ProcessFunction API
   (consider it as a DataStream API V2). There's an idea to introduce a
   Generalized Watermark mechanism, where basically the watermark can be
   anything that needs to travel along the data-flow with certain alignment
   strategies, and event time watermark would be one specific case of it. This
   is still an idea and has not been discussed and agreed on by the community,
   and we are preparing a FLIP for it. But if we are going for it, the concept
   "watermark-lag-threshold" could be ambiguous.

I do not intend to block the FLIP on this. I'd also be fine with
introducing the configuration as is, and changing it later, if needed, with
a regular deprecation and migration process. Just making my suggestions.


Best,

Xintong



On Mon, Aug 14, 2023 at 12:00 PM Xuannan Su  wrote:

> Hi Xintong,
>
> Thanks for the reply.
>
> I have considered using the timestamp in the records to determine the
> backlog status, and decided to use watermark at the end. By definition,
> watermark is the time progress indication in the data stream. It indicates
> the stream’s event time has progressed to some specific time. On the other
> hand, timestamp in the records is usually used to generate the watermark.
> Therefore, it appears more appropriate and intuitive to calculate the event
> time lag by watermark and determine the backlog status. And by using the
> watermark, we can easily deal with the out-of-order and the idleness of the
> data.
>
> Please let me know if you have further questions.
>
> Best,
> Xuannan
> On Aug 10, 2023, 20:23 +0800, Xintong Song , wrote:
> > Thanks for preparing the FLIP, Xuannan.
> >
> > +1 in general.
> >
> > A quick question, could you explain why we are relying on the watermark
> for
> > emitting the record attribute? Why not use timestamps in the records? I
> > don't see any concern in using watermarks. Just wondering if there's any
> > deep considerations behind this.
> >
> > Best,
> >
> > Xintong
> >
> >
> >
> > On Thu, Aug 3, 2023 at 3:03 PM Xuannan Su  wrote:
> >
> > > Hi all,
> > >
> > > I am opening this thread to discuss FLIP-328: Allow source operators to
> > > determine isProcessingBacklog based on watermark lag[1]. We had a
> several
> > > discussions with Dong Ling about the design, and thanks for all the
> > > valuable advice.
> > >
> > > The FLIP aims to target the use-case where user want to run a Flink
> job to
> > > backfill historical data in a high throughput manner and continue
> > > processing real-time data with low latency. Building upon the backlog
> > > concept introduced in FLIP-309[2], this proposal enables sources to
> report
> > > their status of processing backlog based on the watermark lag.
> > >
> > > We would greatly appreciate any comments or feedback you may have on
> this
> > > proposal.
> > >
> > > Best,
> > > Xuannan
> > >
> > >
> > > [1]
> > >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-328%3A+Allow+source+operators+to+determine+isProcessingBacklog+based+on+watermark+lag
> > > [2]
> > >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-309%3A+Support+using+larger+checkpointing+interval+when+source+is+processing+backlog
> > >
>


Re: [DISCUSS] FLIP-328: Allow source operators to determine isProcessingBacklog based on watermark lag

2023-08-13 Thread Xuannan Su
Hi Xintong,

Thanks for the reply.

I have considered using the timestamp in the records to determine the backlog 
status, and decided to use watermark at the end. By definition, watermark is 
the time progress indication in the data stream. It indicates the stream’s 
event time has progressed to some specific time. On the other hand, timestamp 
in the records is usually used to generate the watermark. Therefore, it appears 
more appropriate and intuitive to calculate the event time lag by watermark and 
determine the backlog status. And by using the watermark, we can easily deal 
with the out-of-order and the idleness of the data.

Please let me know if you have further questions.

Best,
Xuannan
On Aug 10, 2023, 20:23 +0800, Xintong Song , wrote:
> Thanks for preparing the FLIP, Xuannan.
>
> +1 in general.
>
> A quick question, could you explain why we are relying on the watermark for
> emitting the record attribute? Why not use timestamps in the records? I
> don't see any concern in using watermarks. Just wondering if there's any
> deep considerations behind this.
>
> Best,
>
> Xintong
>
>
>
> On Thu, Aug 3, 2023 at 3:03 PM Xuannan Su  wrote:
>
> > Hi all,
> >
> > I am opening this thread to discuss FLIP-328: Allow source operators to
> > determine isProcessingBacklog based on watermark lag[1]. We had a several
> > discussions with Dong Ling about the design, and thanks for all the
> > valuable advice.
> >
> > The FLIP aims to target the use-case where user want to run a Flink job to
> > backfill historical data in a high throughput manner and continue
> > processing real-time data with low latency. Building upon the backlog
> > concept introduced in FLIP-309[2], this proposal enables sources to report
> > their status of processing backlog based on the watermark lag.
> >
> > We would greatly appreciate any comments or feedback you may have on this
> > proposal.
> >
> > Best,
> > Xuannan
> >
> >
> > [1]
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-328%3A+Allow+source+operators+to+determine+isProcessingBacklog+based+on+watermark+lag
> > [2]
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-309%3A+Support+using+larger+checkpointing+interval+when+source+is+processing+backlog
> >


Re: [DISCUSS] FLIP-328: Allow source operators to determine isProcessingBacklog based on watermark lag

2023-08-10 Thread Xintong Song
Thanks for preparing the FLIP, Xuannan.

+1 in general.

A quick question, could you explain why we are relying on the watermark for
emitting the record attribute? Why not use timestamps in the records? I
don't see any concern in using watermarks. Just wondering if there's any
deep considerations behind this.

Best,

Xintong



On Thu, Aug 3, 2023 at 3:03 PM Xuannan Su  wrote:

> Hi all,
>
> I am opening this thread to discuss FLIP-328: Allow source operators to
> determine isProcessingBacklog based on watermark lag[1]. We had a several
> discussions with Dong Ling about the design, and thanks for all the
> valuable advice.
>
> The FLIP aims to target the use-case where user want to run a Flink job to
> backfill historical data in a high throughput manner and continue
> processing real-time data with low latency. Building upon the backlog
> concept introduced in FLIP-309[2], this proposal enables sources to report
> their status of processing backlog based on the watermark lag.
>
> We would greatly appreciate any comments or feedback you may have on this
> proposal.
>
> Best,
> Xuannan
>
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-328%3A+Allow+source+operators+to+determine+isProcessingBacklog+based+on+watermark+lag
> [2]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-309%3A+Support+using+larger+checkpointing+interval+when+source+is+processing+backlog
>