Re: [DISCUSS] FLIP-328: Allow source operators to determine isProcessingBacklog based on watermark lag

2023-09-25 Thread Dong Lin
Hi Piotr,

Thanks for the comments. Let me try to explain it below.

Overall, the two competing options differ in how an invocation of
`#setIsProcessingBacklog(false)` affects the backlog status for the given
source (corresponding to the SplitEnumeratorContext instance on which this
method is invoked).

- With my approach, setIsProcessingBacklog(false) merely unsets effects of
any previous invocation of setIsProcessingBacklog(..) on the given source,
without necessarily forcing the source's backlog status to be false.
- With Jark’s approach, setIsProcessingBacklog(false) forces the source's
backlog status to be false.

There is no practical difference between these two options as of FLIP-309.
However, once we introduce additional strategy (e.g. job-level config) to
configure backlog status in FLIP-328, there will be tricky and important
differences between them.

More specifically, let’s say we want to introduce a job-level config such
as “”pipeline.backlog.watermark-lag-threshold” as mentioned in FLIP-328:

- With Jack’s approach, if MySQL CDC invokes setIsProcessingBacklog(false)
at the beginning of the “unbounded phase”, then that effectively means
isProcessingBacklog=false even if watermark lag exceeds the configured
threshold, preventing job-level config from taking effect during the
"unbounded phase".
- With my approach, even if MySQL CDC invokes setIsProcessingBacklog(false)
at the beginning of the “unbounded phase”, the source can still have
isProcessingBacklog=true when watermark lag is too high.

Would this clarify the difference between these two options?

Regards,
Dong


On Mon, Sep 25, 2023 at 5:15 PM Piotr Nowojski 
wrote:

> Hi Jarl and Dong,
>
> I'm a bit confused about the difference between the two competing options.
> Could one of you elaborate what's the difference between:
> > 2) The semantics of `#setIsProcessingBacklog(false)` is that it overrides
> > the effect of the previous invocation (if any) of
> > `#setIsProcessingBacklog(true)` on the given source instance.
>
> and
>
> > 2) The semantics of `#setIsProcessingBacklog(false)` is that the given
> > source instance will have watermarkLag=false.
>
> ?
>
> Best,
> Piotrek
>
> czw., 21 wrz 2023 o 15:28 Dong Lin  napisał(a):
>
> > Hi all,
> >
> > Jark and I discussed this FLIP offline and I will summarize our
> discussion
> > below. It would be great if you could provide your opinion of the
> proposed
> > options.
> >
> > Regarding the target use-cases:
> > - We both agreed that MySQL CDC should have backlog=true when
> watermarkLag
> > is large during the binlog phase.
> > - Dong argued that other streaming sources with watermarkLag defined
> (e.g.
> > Kafka) should also have backlog=true when watermarkLag is large. The
> > pros/cons discussion below assume this use-case needs to be supported.
> >
> > The 1st option is what is currently proposed in FLIP-328, with the
> > following key characteristics:
> > 1) There is one job-level config (i.e.
> > pipeline.backlog.watermark-lag-threshold) that applies to all sources
> with
> > watermarkLag metric defined.
> > 2) The semantics of `#setIsProcessingBacklog(false)` is that it overrides
> > the effect of the previous invocation (if any) of
> > `#setIsProcessingBacklog(true)` on the given source instance.
> >
> > The 2nd option is what Jark proposed in this email thread, with the
> > following key characteristics:
> > 1) Add source-specific config (both Java API and SQL source property) to
> > every source for which we want to set backlog status based on the
> > watermarkLag metric. For example, we might add separate Java APIs
> > `#setWatermarkLagThreshold`  for MySQL CDC source, HybridSource,
> > KafkaSource, PulsarSource etc.
> > 2) The semantics of `#setIsProcessingBacklog(false)` is that the given
> > source instance will have watermarkLag=false.
> >
> > Here are the key pros/cons of these two options.
> >
> > Cons of the 1st option:
> > 1) The semantics of `#setIsProcessingBacklog(false)` is harder to
> > understand for Flink operator developers than the corresponding semantics
> > in option-2.
> >
> > Cons of the  2nd option:
> > 1) More work for end-users. For a job with multiple sources that need to
> be
> > configured with a watermark lag threshold, users need to specify multiple
> > configs (one for each source) instead of specifying one job-level config.
> >
> > 2) More work for Flink operator developers. Overall there are more public
> > APIs (one Java API and one SQL property for each source that needs to
> > determine backlog based on watermark) exposed to end users. This also
> adds
> > more burden for the Flink community to maintain these APIs.
> >
> > 3) It would be hard (e.g. require backward incompatible API change) to
> > extend the Flink runtime to support job-level config to set watermark
> > strategy in the future (e.g. support the
> > pipeline.backlog.watermark-lag-threshold in option-1). This is because an
> > existing source operator's code might have hardcoded 

Re: [DISCUSS] FLIP-328: Allow source operators to determine isProcessingBacklog based on watermark lag

2023-09-25 Thread Piotr Nowojski
Hi Jarl and Dong,

I'm a bit confused about the difference between the two competing options.
Could one of you elaborate what's the difference between:
> 2) The semantics of `#setIsProcessingBacklog(false)` is that it overrides
> the effect of the previous invocation (if any) of
> `#setIsProcessingBacklog(true)` on the given source instance.

and

> 2) The semantics of `#setIsProcessingBacklog(false)` is that the given
> source instance will have watermarkLag=false.

?

Best,
Piotrek

czw., 21 wrz 2023 o 15:28 Dong Lin  napisał(a):

> Hi all,
>
> Jark and I discussed this FLIP offline and I will summarize our discussion
> below. It would be great if you could provide your opinion of the proposed
> options.
>
> Regarding the target use-cases:
> - We both agreed that MySQL CDC should have backlog=true when watermarkLag
> is large during the binlog phase.
> - Dong argued that other streaming sources with watermarkLag defined (e.g.
> Kafka) should also have backlog=true when watermarkLag is large. The
> pros/cons discussion below assume this use-case needs to be supported.
>
> The 1st option is what is currently proposed in FLIP-328, with the
> following key characteristics:
> 1) There is one job-level config (i.e.
> pipeline.backlog.watermark-lag-threshold) that applies to all sources with
> watermarkLag metric defined.
> 2) The semantics of `#setIsProcessingBacklog(false)` is that it overrides
> the effect of the previous invocation (if any) of
> `#setIsProcessingBacklog(true)` on the given source instance.
>
> The 2nd option is what Jark proposed in this email thread, with the
> following key characteristics:
> 1) Add source-specific config (both Java API and SQL source property) to
> every source for which we want to set backlog status based on the
> watermarkLag metric. For example, we might add separate Java APIs
> `#setWatermarkLagThreshold`  for MySQL CDC source, HybridSource,
> KafkaSource, PulsarSource etc.
> 2) The semantics of `#setIsProcessingBacklog(false)` is that the given
> source instance will have watermarkLag=false.
>
> Here are the key pros/cons of these two options.
>
> Cons of the 1st option:
> 1) The semantics of `#setIsProcessingBacklog(false)` is harder to
> understand for Flink operator developers than the corresponding semantics
> in option-2.
>
> Cons of the  2nd option:
> 1) More work for end-users. For a job with multiple sources that need to be
> configured with a watermark lag threshold, users need to specify multiple
> configs (one for each source) instead of specifying one job-level config.
>
> 2) More work for Flink operator developers. Overall there are more public
> APIs (one Java API and one SQL property for each source that needs to
> determine backlog based on watermark) exposed to end users. This also adds
> more burden for the Flink community to maintain these APIs.
>
> 3) It would be hard (e.g. require backward incompatible API change) to
> extend the Flink runtime to support job-level config to set watermark
> strategy in the future (e.g. support the
> pipeline.backlog.watermark-lag-threshold in option-1). This is because an
> existing source operator's code might have hardcoded an invocation of
> `#setIsProcessingBacklog(false)`, which means the backlog status must be
> set to true, which prevents Flink runtime from setting backlog=true when a
> new strategy is triggered.
>
> Overall, I am still inclined to choose option-1 because it is more
> extensible and simpler to use in the long term when we want to support/use
> multiple sources whose backlog status can change based on the watermark
> lag. While option-1's `#setIsProcessingBacklog` is a bit harder to
> understand than option-2, I think this overhead/cost is worthwhile as it
> makes end-users' life easier in the long term.
>
> Jark: thank you for taking the time to review this FLIP. Please feel free
> to comment if I missed anything in the pros/cons above.
>
> Jark and I have not reached agreement on which option is better. It will be
> really helpful if we can get more comments on these options.
>
> Thanks,
> Dong
>
>
> On Tue, Sep 19, 2023 at 11:26 AM Dong Lin  wrote:
>
> > Hi Jark,
> >
> > Thanks for the reply. Please see my comments inline.
> >
> > On Tue, Sep 19, 2023 at 10:12 AM Jark Wu  wrote:
> >
> >> Hi Dong,
> >>
> >> Sorry for the late reply.
> >>
> >> > The rationale is that if there is any strategy that is triggered and
> >> says
> >> > backlog=true, then job's backlog should be true. Otherwise, the job's
> >> > backlog status is false.
> >>
> >> I'm quite confused about this. Does that mean, if the source is in the
> >> changelog phase, the source has to continuously invoke
> >> "setIsProcessingBacklog(true)" (in an infinite loop?). Otherwise,
> >> the job's backlog status would be set to false by the framework?
> >>
> >
> > No, the source would not have to continuously invoke
> > setIsProcessingBacklog(true) in an infinite loop.
> >
> > Actually, I am not very sure why there is confusion that 

Re: [DISCUSS] FLIP-328: Allow source operators to determine isProcessingBacklog based on watermark lag

2023-09-21 Thread Dong Lin
Hi all,

Jark and I discussed this FLIP offline and I will summarize our discussion
below. It would be great if you could provide your opinion of the proposed
options.

Regarding the target use-cases:
- We both agreed that MySQL CDC should have backlog=true when watermarkLag
is large during the binlog phase.
- Dong argued that other streaming sources with watermarkLag defined (e.g.
Kafka) should also have backlog=true when watermarkLag is large. The
pros/cons discussion below assume this use-case needs to be supported.

The 1st option is what is currently proposed in FLIP-328, with the
following key characteristics:
1) There is one job-level config (i.e.
pipeline.backlog.watermark-lag-threshold) that applies to all sources with
watermarkLag metric defined.
2) The semantics of `#setIsProcessingBacklog(false)` is that it overrides
the effect of the previous invocation (if any) of
`#setIsProcessingBacklog(true)` on the given source instance.

The 2nd option is what Jark proposed in this email thread, with the
following key characteristics:
1) Add source-specific config (both Java API and SQL source property) to
every source for which we want to set backlog status based on the
watermarkLag metric. For example, we might add separate Java APIs
`#setWatermarkLagThreshold`  for MySQL CDC source, HybridSource,
KafkaSource, PulsarSource etc.
2) The semantics of `#setIsProcessingBacklog(false)` is that the given
source instance will have watermarkLag=false.

Here are the key pros/cons of these two options.

Cons of the 1st option:
1) The semantics of `#setIsProcessingBacklog(false)` is harder to
understand for Flink operator developers than the corresponding semantics
in option-2.

Cons of the  2nd option:
1) More work for end-users. For a job with multiple sources that need to be
configured with a watermark lag threshold, users need to specify multiple
configs (one for each source) instead of specifying one job-level config.

2) More work for Flink operator developers. Overall there are more public
APIs (one Java API and one SQL property for each source that needs to
determine backlog based on watermark) exposed to end users. This also adds
more burden for the Flink community to maintain these APIs.

3) It would be hard (e.g. require backward incompatible API change) to
extend the Flink runtime to support job-level config to set watermark
strategy in the future (e.g. support the
pipeline.backlog.watermark-lag-threshold in option-1). This is because an
existing source operator's code might have hardcoded an invocation of
`#setIsProcessingBacklog(false)`, which means the backlog status must be
set to true, which prevents Flink runtime from setting backlog=true when a
new strategy is triggered.

Overall, I am still inclined to choose option-1 because it is more
extensible and simpler to use in the long term when we want to support/use
multiple sources whose backlog status can change based on the watermark
lag. While option-1's `#setIsProcessingBacklog` is a bit harder to
understand than option-2, I think this overhead/cost is worthwhile as it
makes end-users' life easier in the long term.

Jark: thank you for taking the time to review this FLIP. Please feel free
to comment if I missed anything in the pros/cons above.

Jark and I have not reached agreement on which option is better. It will be
really helpful if we can get more comments on these options.

Thanks,
Dong


On Tue, Sep 19, 2023 at 11:26 AM Dong Lin  wrote:

> Hi Jark,
>
> Thanks for the reply. Please see my comments inline.
>
> On Tue, Sep 19, 2023 at 10:12 AM Jark Wu  wrote:
>
>> Hi Dong,
>>
>> Sorry for the late reply.
>>
>> > The rationale is that if there is any strategy that is triggered and
>> says
>> > backlog=true, then job's backlog should be true. Otherwise, the job's
>> > backlog status is false.
>>
>> I'm quite confused about this. Does that mean, if the source is in the
>> changelog phase, the source has to continuously invoke
>> "setIsProcessingBacklog(true)" (in an infinite loop?). Otherwise,
>> the job's backlog status would be set to false by the framework?
>>
>
> No, the source would not have to continuously invoke
> setIsProcessingBacklog(true) in an infinite loop.
>
> Actually, I am not very sure why there is confusion that "the job's
> backlog status would be set to false by the framework". Could you explain
> where that comes from?
>
> I guess it might be useful to provide a complete overview of how
> setIsProcessingBacklog(...)
> and pipline.backlog.watermark-lag-threshold work together to determine the
> overall job's backlog status. Let me explain it below.
>
> Here is the semantics/behavior of setIsProcessingBacklog(..).
> - This method is invoked on a per-source basis.
> - This method can be invoked multiple times with true/false as its input
> parameter.
> - For a given source, the last invocation of this method overwrites the
> effect of earlier invocation of this method.
> - For a given source, if this method has been invoked 

Re: [DISCUSS] FLIP-328: Allow source operators to determine isProcessingBacklog based on watermark lag

2023-09-18 Thread Dong Lin
Hi Jark,

Thanks for the reply. Please see my comments inline.

On Tue, Sep 19, 2023 at 10:12 AM Jark Wu  wrote:

> Hi Dong,
>
> Sorry for the late reply.
>
> > The rationale is that if there is any strategy that is triggered and says
> > backlog=true, then job's backlog should be true. Otherwise, the job's
> > backlog status is false.
>
> I'm quite confused about this. Does that mean, if the source is in the
> changelog phase, the source has to continuously invoke
> "setIsProcessingBacklog(true)" (in an infinite loop?). Otherwise,
> the job's backlog status would be set to false by the framework?
>

No, the source would not have to continuously invoke
setIsProcessingBacklog(true) in an infinite loop.

Actually, I am not very sure why there is confusion that "the job's backlog
status would be set to false by the framework". Could you explain where
that comes from?

I guess it might be useful to provide a complete overview of how
setIsProcessingBacklog(...)
and pipline.backlog.watermark-lag-threshold work together to determine the
overall job's backlog status. Let me explain it below.

Here is the semantics/behavior of setIsProcessingBacklog(..).
- This method is invoked on a per-source basis.
- This method can be invoked multiple times with true/false as its input
parameter.
- For a given source, the last invocation of this method overwrites the
effect of earlier invocation of this method.
- For a given source, if this method has been invoked at least once and the
last invocation of this method has isProcessingBacklog = true, then it is
guaranteed that the backlog status of this source is set to true.

Here is the semantics/behavior of pipline.backlog.watermark-lag-threshold:
- This config is specified at the job level and applies to every source
included in this job.
- For a given source, if it's watermarkLag metric is available (see
FLIP-33) and watermarkLag > watermark-lag-threshold, then it is guaranteed
that backlog status of this source is set to true.

Here is how the source's backlog status is determined: If a rule specified
above says the source's backog status should be true, then the source's
backlog status is set to true. Otherwise, it is set to false.

How is how the job's backlog status is determined: If there exists a source
that is currently running and the source's backlog status is set to true,
then the job's backlog status is set to true. Otherwise, it is set to false.

Hopefully this can help clarify the behavior. If needed, we can update the
relevant doc (e.g. setIsProcessingBacklog() Java doc) to make semantics
above clearer for Flink users.

And I would be happy to discuss/explain this design offline when you have
time.

Thanks,
Dong


>
> Best,
> Jark
>
> On Tue, 19 Sept 2023 at 09:13, Dong Lin  wrote:
>
> > Hi Jark,
> >
> > Do you have time to comment on whether the current design looks good?
> >
> > I plan to start voting in 3 days if there is no follow-up comment.
> >
> > Thanks,
> > Dong
> >
> >
> > On Fri, Sep 15, 2023 at 2:01 PM Jark Wu  wrote:
> >
> > > Hi Dong,
> > >
> > > > Note that we can not simply enforce the semantics of "any invocation
> of
> > > > setIsProcessingBacklog(false) will set the job's backlog status to
> > > false".
> > > > Suppose we have a job with two operators, where operatorA invokes
> > > > setIsProcessingBacklog(false) and operatorB invokes
> > > > setIsProcessingBacklog(true). There will be conflict if we use the
> > > > semantics of "any invocation of setIsProcessingBacklog(false) will
> set
> > > the
> > > > job's backlog status to false".
> > >
> > > So it should set job's backlog status to false if the job has only a
> > single
> > > source,
> > > right?
> > >
> > > Could you elaborate on the behavior if there is a job with a single
> > source,
> > > and the watermark lag exceeds the configured value (should set backlog
> to
> > > true?),
> > > but the source invokes "setIsProcessingBacklog(false)"? Or the inverse
> > one,
> > > the source invokes "setIsProcessingBacklog(false)" first, but the
> > watermark
> > > lag
> > > exceeds the configured value.
> > >
> > > This is the conflict I'm concerned about.
> > >
> > > Best,
> > > Jark
> > >
> > > On Fri, 15 Sept 2023 at 12:00, Dong Lin  wrote:
> > >
> > > > Hi Jark,
> > > >
> > > > Please see my comments inline.
> > > >
> > > > On Fri, Sep 15, 2023 at 10:35 AM Jark Wu  wrote:
> > > >
> > > > > Hi Dong,
> > > > >
> > > > > Please see my comments inline below.
> > > >
> > > >
> > > > > > Hmm.. can you explain what you mean by "different watermark delay
> > > > > > definitions for each source"?
> > > > >
> > > > > For example, "table1" defines a watermark with delay 5 seconds,
> > > > > "table2" defines a watermark with delay 10 seconds. They have
> > different
> > > > > watermark delay definitions. So it is also reasonable they have
> > > different
> > > > > watermark lag definitions, e.g., "table1" allows "10mins" and
> > "table2"
> > > > > allows "20mins".
> > > > >
> > > >
> > > > I think the 

Re: [DISCUSS] FLIP-328: Allow source operators to determine isProcessingBacklog based on watermark lag

2023-09-18 Thread Jark Wu
Hi Dong,

Sorry for the late reply.

> The rationale is that if there is any strategy that is triggered and says
> backlog=true, then job's backlog should be true. Otherwise, the job's
> backlog status is false.

I'm quite confused about this. Does that mean, if the source is in the
changelog phase, the source has to continuously invoke
"setIsProcessingBacklog(true)" (in an infinite loop?). Otherwise,
the job's backlog status would be set to false by the framework?

Best,
Jark

On Tue, 19 Sept 2023 at 09:13, Dong Lin  wrote:

> Hi Jark,
>
> Do you have time to comment on whether the current design looks good?
>
> I plan to start voting in 3 days if there is no follow-up comment.
>
> Thanks,
> Dong
>
>
> On Fri, Sep 15, 2023 at 2:01 PM Jark Wu  wrote:
>
> > Hi Dong,
> >
> > > Note that we can not simply enforce the semantics of "any invocation of
> > > setIsProcessingBacklog(false) will set the job's backlog status to
> > false".
> > > Suppose we have a job with two operators, where operatorA invokes
> > > setIsProcessingBacklog(false) and operatorB invokes
> > > setIsProcessingBacklog(true). There will be conflict if we use the
> > > semantics of "any invocation of setIsProcessingBacklog(false) will set
> > the
> > > job's backlog status to false".
> >
> > So it should set job's backlog status to false if the job has only a
> single
> > source,
> > right?
> >
> > Could you elaborate on the behavior if there is a job with a single
> source,
> > and the watermark lag exceeds the configured value (should set backlog to
> > true?),
> > but the source invokes "setIsProcessingBacklog(false)"? Or the inverse
> one,
> > the source invokes "setIsProcessingBacklog(false)" first, but the
> watermark
> > lag
> > exceeds the configured value.
> >
> > This is the conflict I'm concerned about.
> >
> > Best,
> > Jark
> >
> > On Fri, 15 Sept 2023 at 12:00, Dong Lin  wrote:
> >
> > > Hi Jark,
> > >
> > > Please see my comments inline.
> > >
> > > On Fri, Sep 15, 2023 at 10:35 AM Jark Wu  wrote:
> > >
> > > > Hi Dong,
> > > >
> > > > Please see my comments inline below.
> > >
> > >
> > > > > Hmm.. can you explain what you mean by "different watermark delay
> > > > > definitions for each source"?
> > > >
> > > > For example, "table1" defines a watermark with delay 5 seconds,
> > > > "table2" defines a watermark with delay 10 seconds. They have
> different
> > > > watermark delay definitions. So it is also reasonable they have
> > different
> > > > watermark lag definitions, e.g., "table1" allows "10mins" and
> "table2"
> > > > allows "20mins".
> > > >
> > >
> > > I think the watermark delay you mentioned above is conceptually /
> > > fundamentally different from the watermark-lag-threshold proposed in
> this
> > > FLIP.
> > >
> > > It might be useful to revisit the semantics of these two concepts:
> > > - watermark delay is used to account for the maximum amount of
> > orderliness
> > > that users expect (or willing to wait for) for records from a given
> > source.
> > > - watermark-lag-threshold is used to define when processing latency is
> no
> > > longer important (e.g. because data is already stale).
> > >
> > > Even though users might expect different out of orderliness for
> different
> > > sources, users do not necessarily have different definitions /
> thresholds
> > > for when a record is considered "already stale".
> > >
> > >
> > > >
> > > > > I think there is probably misunderstanding here. FLIP-309 does NOT
> > > > directly
> > > > > specify when backlog is false. It is intentionally specified in
> such
> > a
> > > > way
> > > > > that there will  not be any conflict between these rules.
> > > >
> > > > Do you mean FLIP-309 doesn't allow to specify backlog to be false?
> > > > Is this mentioned in FLIP-309? This is completely different from
> what I
> > > >
> > >
> > > Can you explain what you mean by "allow to specify backlog to be
> false"?
> > >
> > > If what you mean is that "can invoke setIsProcessingBacklog(false)",
> then
> > > FLIP-309 supports doing this.
> > >
> > > If what you mean is that "any invocation of
> setIsProcessingBacklog(false)
> > > will set the job's backlog status to false", then FLIP-309 does not
> > support
> > > this. I believe the existing Java doc of this API and FLIP-309 is
> > > compatible with this explanation.
> > >
> > > Note that we can not simply enforce the semantics of "any invocation of
> > > setIsProcessingBacklog(false) will set the job's backlog status to
> > false".
> > > Suppose we have a job with two operators, where operatorA invokes
> > > setIsProcessingBacklog(false) and operatorB invokes
> > > setIsProcessingBacklog(true). There will be conflict if we use the
> > > semantics of "any invocation of setIsProcessingBacklog(false) will set
> > the
> > > job's backlog status to false".
> > >
> > > Would this answer your question?
> > >
> > > Best,
> > > Dong
> > >
> > >
> > > > understand. From the API interface
> > "ctx.setIsProcessingBacklog(boolean)",
> > > > it 

Re: [DISCUSS] FLIP-328: Allow source operators to determine isProcessingBacklog based on watermark lag

2023-09-18 Thread Dong Lin
Hi Jark,

Do you have time to comment on whether the current design looks good?

I plan to start voting in 3 days if there is no follow-up comment.

Thanks,
Dong


On Fri, Sep 15, 2023 at 2:01 PM Jark Wu  wrote:

> Hi Dong,
>
> > Note that we can not simply enforce the semantics of "any invocation of
> > setIsProcessingBacklog(false) will set the job's backlog status to
> false".
> > Suppose we have a job with two operators, where operatorA invokes
> > setIsProcessingBacklog(false) and operatorB invokes
> > setIsProcessingBacklog(true). There will be conflict if we use the
> > semantics of "any invocation of setIsProcessingBacklog(false) will set
> the
> > job's backlog status to false".
>
> So it should set job's backlog status to false if the job has only a single
> source,
> right?
>
> Could you elaborate on the behavior if there is a job with a single source,
> and the watermark lag exceeds the configured value (should set backlog to
> true?),
> but the source invokes "setIsProcessingBacklog(false)"? Or the inverse one,
> the source invokes "setIsProcessingBacklog(false)" first, but the watermark
> lag
> exceeds the configured value.
>
> This is the conflict I'm concerned about.
>
> Best,
> Jark
>
> On Fri, 15 Sept 2023 at 12:00, Dong Lin  wrote:
>
> > Hi Jark,
> >
> > Please see my comments inline.
> >
> > On Fri, Sep 15, 2023 at 10:35 AM Jark Wu  wrote:
> >
> > > Hi Dong,
> > >
> > > Please see my comments inline below.
> >
> >
> > > > Hmm.. can you explain what you mean by "different watermark delay
> > > > definitions for each source"?
> > >
> > > For example, "table1" defines a watermark with delay 5 seconds,
> > > "table2" defines a watermark with delay 10 seconds. They have different
> > > watermark delay definitions. So it is also reasonable they have
> different
> > > watermark lag definitions, e.g., "table1" allows "10mins" and "table2"
> > > allows "20mins".
> > >
> >
> > I think the watermark delay you mentioned above is conceptually /
> > fundamentally different from the watermark-lag-threshold proposed in this
> > FLIP.
> >
> > It might be useful to revisit the semantics of these two concepts:
> > - watermark delay is used to account for the maximum amount of
> orderliness
> > that users expect (or willing to wait for) for records from a given
> source.
> > - watermark-lag-threshold is used to define when processing latency is no
> > longer important (e.g. because data is already stale).
> >
> > Even though users might expect different out of orderliness for different
> > sources, users do not necessarily have different definitions / thresholds
> > for when a record is considered "already stale".
> >
> >
> > >
> > > > I think there is probably misunderstanding here. FLIP-309 does NOT
> > > directly
> > > > specify when backlog is false. It is intentionally specified in such
> a
> > > way
> > > > that there will  not be any conflict between these rules.
> > >
> > > Do you mean FLIP-309 doesn't allow to specify backlog to be false?
> > > Is this mentioned in FLIP-309? This is completely different from what I
> > >
> >
> > Can you explain what you mean by "allow to specify backlog to be false"?
> >
> > If what you mean is that "can invoke setIsProcessingBacklog(false)", then
> > FLIP-309 supports doing this.
> >
> > If what you mean is that "any invocation of setIsProcessingBacklog(false)
> > will set the job's backlog status to false", then FLIP-309 does not
> support
> > this. I believe the existing Java doc of this API and FLIP-309 is
> > compatible with this explanation.
> >
> > Note that we can not simply enforce the semantics of "any invocation of
> > setIsProcessingBacklog(false) will set the job's backlog status to
> false".
> > Suppose we have a job with two operators, where operatorA invokes
> > setIsProcessingBacklog(false) and operatorB invokes
> > setIsProcessingBacklog(true). There will be conflict if we use the
> > semantics of "any invocation of setIsProcessingBacklog(false) will set
> the
> > job's backlog status to false".
> >
> > Would this answer your question?
> >
> > Best,
> > Dong
> >
> >
> > > understand. From the API interface
> "ctx.setIsProcessingBacklog(boolean)",
> > > it allows users to invoke "setIsProcessingBacklog(false)". And FLIP-309
> > > also says "MySQL CDC source should report isProcessingBacklog=false
> > > at the beginning of the changelog stage." If not, maybe we need to
> > revisit
> > > FLIP-309.
> >
> >
> > > Best,
> > > Jark
> > >
> > >
> > >
> > > On Fri, 15 Sept 2023 at 08:41, Dong Lin  wrote:
> > >
> > > > Hi Jark,
> > > >
> > > > Do you have any follow-up comment?
> > > >
> > > > My gut feeling is that suppose we need to support per-source
> watermark
> > > lag
> > > > specification in the future (not sure we have a use-case for this
> right
> > > > now), we can add such a config in the future with a follow-up FLIP.
> The
> > > > job-level config will still be useful as it makes users'
> configuration
> > > > simpler for common scenarios.
> 

Re: [DISCUSS] FLIP-328: Allow source operators to determine isProcessingBacklog based on watermark lag

2023-09-15 Thread Dong Lin
Hi Jark,

Please see my reply inline.

On Fri, Sep 15, 2023 at 2:01 PM Jark Wu  wrote:

> Hi Dong,
>
> > Note that we can not simply enforce the semantics of "any invocation of
> > setIsProcessingBacklog(false) will set the job's backlog status to
> false".
> > Suppose we have a job with two operators, where operatorA invokes
> > setIsProcessingBacklog(false) and operatorB invokes
> > setIsProcessingBacklog(true). There will be conflict if we use the
> > semantics of "any invocation of setIsProcessingBacklog(false) will set
> the
> > job's backlog status to false".
>
> So it should set job's backlog status to false if the job has only a single
> source,
> right?
>

As of the FLIP-309's implementation, with the constraint that there is only
one API to set backlog=true, your understanding is correct.

However, it might be useful to recall that FLIP-309 is proposed with the
explicit plan to add more strategies to set backlog=true, which is
documented in its future work section. The above statement, which assumes
there is only one strategy, is not useful in the long term.

With this knowledge in mind, a better and more long-lasting way to
interpret the semantics of setIsProcessingBacklog() would be this: "it
should set job's backlog status to false if the job has only a single
source AND there is no other strategy to set backlog=true".


> Could you elaborate on the behavior if there is a job with a single source,
> and the watermark lag exceeds the configured value (should set backlog to
> true?),
> but the source invokes "setIsProcessingBacklog(false)"? Or the inverse one,
> the source invokes "setIsProcessingBacklog(false)" first, but the watermark
> lag
> exceeds the configured value.
>

Suppose setIsProcessingBacklog(false) is invoked, and watermark lag exceeds
the configured value, then the job's backlog status is set to true.

The rationale is that if there is any strategy that is triggered and says
backlog=true, then job's backlog should be true. Otherwise, the job's
backlog status is false.

The key idea to avoid conflict is that we will not add any API with the
capability to explicitly set job's backlog status to false. A job's backlog
status is false if and only if no rule says it should be set to true.

BTW, I can understand that setIsProcessingBacklog(false) appears to suggest
the job's backlog is set to false. We can certainly update its Java doc to
make the semantics clearer. If you have any suggestion on a better name for
this method, we can also update it accordingly.

What do you think?


> This is the conflict I'm concerned about.
>
> Best,
> Jark
>
> On Fri, 15 Sept 2023 at 12:00, Dong Lin  wrote:
>
> > Hi Jark,
> >
> > Please see my comments inline.
> >
> > On Fri, Sep 15, 2023 at 10:35 AM Jark Wu  wrote:
> >
> > > Hi Dong,
> > >
> > > Please see my comments inline below.
> >
> >
> > > > Hmm.. can you explain what you mean by "different watermark delay
> > > > definitions for each source"?
> > >
> > > For example, "table1" defines a watermark with delay 5 seconds,
> > > "table2" defines a watermark with delay 10 seconds. They have different
> > > watermark delay definitions. So it is also reasonable they have
> different
> > > watermark lag definitions, e.g., "table1" allows "10mins" and "table2"
> > > allows "20mins".
> > >
> >
> > I think the watermark delay you mentioned above is conceptually /
> > fundamentally different from the watermark-lag-threshold proposed in this
> > FLIP.
> >
> > It might be useful to revisit the semantics of these two concepts:
> > - watermark delay is used to account for the maximum amount of
> orderliness
> > that users expect (or willing to wait for) for records from a given
> source.
> > - watermark-lag-threshold is used to define when processing latency is no
> > longer important (e.g. because data is already stale).
> >
> > Even though users might expect different out of orderliness for different
> > sources, users do not necessarily have different definitions / thresholds
> > for when a record is considered "already stale".
> >
> >
> > >
> > > > I think there is probably misunderstanding here. FLIP-309 does NOT
> > > directly
> > > > specify when backlog is false. It is intentionally specified in such
> a
> > > way
> > > > that there will  not be any conflict between these rules.
> > >
> > > Do you mean FLIP-309 doesn't allow to specify backlog to be false?
> > > Is this mentioned in FLIP-309? This is completely different from what I
> > >
> >
> > Can you explain what you mean by "allow to specify backlog to be false"?
> >
> > If what you mean is that "can invoke setIsProcessingBacklog(false)", then
> > FLIP-309 supports doing this.
> >
> > If what you mean is that "any invocation of setIsProcessingBacklog(false)
> > will set the job's backlog status to false", then FLIP-309 does not
> support
> > this. I believe the existing Java doc of this API and FLIP-309 is
> > compatible with this explanation.
> >
> > Note that we can not simply enforce the 

Re: [DISCUSS] FLIP-328: Allow source operators to determine isProcessingBacklog based on watermark lag

2023-09-15 Thread Jark Wu
Hi Dong,

> Note that we can not simply enforce the semantics of "any invocation of
> setIsProcessingBacklog(false) will set the job's backlog status to false".
> Suppose we have a job with two operators, where operatorA invokes
> setIsProcessingBacklog(false) and operatorB invokes
> setIsProcessingBacklog(true). There will be conflict if we use the
> semantics of "any invocation of setIsProcessingBacklog(false) will set the
> job's backlog status to false".

So it should set job's backlog status to false if the job has only a single
source,
right?

Could you elaborate on the behavior if there is a job with a single source,
and the watermark lag exceeds the configured value (should set backlog to
true?),
but the source invokes "setIsProcessingBacklog(false)"? Or the inverse one,
the source invokes "setIsProcessingBacklog(false)" first, but the watermark
lag
exceeds the configured value.

This is the conflict I'm concerned about.

Best,
Jark

On Fri, 15 Sept 2023 at 12:00, Dong Lin  wrote:

> Hi Jark,
>
> Please see my comments inline.
>
> On Fri, Sep 15, 2023 at 10:35 AM Jark Wu  wrote:
>
> > Hi Dong,
> >
> > Please see my comments inline below.
>
>
> > > Hmm.. can you explain what you mean by "different watermark delay
> > > definitions for each source"?
> >
> > For example, "table1" defines a watermark with delay 5 seconds,
> > "table2" defines a watermark with delay 10 seconds. They have different
> > watermark delay definitions. So it is also reasonable they have different
> > watermark lag definitions, e.g., "table1" allows "10mins" and "table2"
> > allows "20mins".
> >
>
> I think the watermark delay you mentioned above is conceptually /
> fundamentally different from the watermark-lag-threshold proposed in this
> FLIP.
>
> It might be useful to revisit the semantics of these two concepts:
> - watermark delay is used to account for the maximum amount of orderliness
> that users expect (or willing to wait for) for records from a given source.
> - watermark-lag-threshold is used to define when processing latency is no
> longer important (e.g. because data is already stale).
>
> Even though users might expect different out of orderliness for different
> sources, users do not necessarily have different definitions / thresholds
> for when a record is considered "already stale".
>
>
> >
> > > I think there is probably misunderstanding here. FLIP-309 does NOT
> > directly
> > > specify when backlog is false. It is intentionally specified in such a
> > way
> > > that there will  not be any conflict between these rules.
> >
> > Do you mean FLIP-309 doesn't allow to specify backlog to be false?
> > Is this mentioned in FLIP-309? This is completely different from what I
> >
>
> Can you explain what you mean by "allow to specify backlog to be false"?
>
> If what you mean is that "can invoke setIsProcessingBacklog(false)", then
> FLIP-309 supports doing this.
>
> If what you mean is that "any invocation of setIsProcessingBacklog(false)
> will set the job's backlog status to false", then FLIP-309 does not support
> this. I believe the existing Java doc of this API and FLIP-309 is
> compatible with this explanation.
>
> Note that we can not simply enforce the semantics of "any invocation of
> setIsProcessingBacklog(false) will set the job's backlog status to false".
> Suppose we have a job with two operators, where operatorA invokes
> setIsProcessingBacklog(false) and operatorB invokes
> setIsProcessingBacklog(true). There will be conflict if we use the
> semantics of "any invocation of setIsProcessingBacklog(false) will set the
> job's backlog status to false".
>
> Would this answer your question?
>
> Best,
> Dong
>
>
> > understand. From the API interface "ctx.setIsProcessingBacklog(boolean)",
> > it allows users to invoke "setIsProcessingBacklog(false)". And FLIP-309
> > also says "MySQL CDC source should report isProcessingBacklog=false
> > at the beginning of the changelog stage." If not, maybe we need to
> revisit
> > FLIP-309.
>
>
> > Best,
> > Jark
> >
> >
> >
> > On Fri, 15 Sept 2023 at 08:41, Dong Lin  wrote:
> >
> > > Hi Jark,
> > >
> > > Do you have any follow-up comment?
> > >
> > > My gut feeling is that suppose we need to support per-source watermark
> > lag
> > > specification in the future (not sure we have a use-case for this right
> > > now), we can add such a config in the future with a follow-up FLIP. The
> > > job-level config will still be useful as it makes users' configuration
> > > simpler for common scenarios.
> > >
> > > If it is OK, can we agree to make incremental progress for Flink and
> > start
> > > a voting thread for this FLIP?
> > >
> > > Thanks,
> > > Dong
> > >
> > >
> > > On Mon, Sep 11, 2023 at 4:41 PM Jark Wu  wrote:
> > >
> > > > Hi Dong,
> > > >
> > > > Please see my comments inline.
> > > >
> > > > >  As a result, the proposed job-level
> > > > > config will be applied only in the changelog stage. So there is no
> > > > > difference between these two approaches in this 

Re: [DISCUSS] FLIP-328: Allow source operators to determine isProcessingBacklog based on watermark lag

2023-09-14 Thread Dong Lin
Hi Jark,

Please see my comments inline.

On Fri, Sep 15, 2023 at 10:35 AM Jark Wu  wrote:

> Hi Dong,
>
> Please see my comments inline below.


> > Hmm.. can you explain what you mean by "different watermark delay
> > definitions for each source"?
>
> For example, "table1" defines a watermark with delay 5 seconds,
> "table2" defines a watermark with delay 10 seconds. They have different
> watermark delay definitions. So it is also reasonable they have different
> watermark lag definitions, e.g., "table1" allows "10mins" and "table2"
> allows "20mins".
>

I think the watermark delay you mentioned above is conceptually /
fundamentally different from the watermark-lag-threshold proposed in this
FLIP.

It might be useful to revisit the semantics of these two concepts:
- watermark delay is used to account for the maximum amount of orderliness
that users expect (or willing to wait for) for records from a given source.
- watermark-lag-threshold is used to define when processing latency is no
longer important (e.g. because data is already stale).

Even though users might expect different out of orderliness for different
sources, users do not necessarily have different definitions / thresholds
for when a record is considered "already stale".


>
> > I think there is probably misunderstanding here. FLIP-309 does NOT
> directly
> > specify when backlog is false. It is intentionally specified in such a
> way
> > that there will  not be any conflict between these rules.
>
> Do you mean FLIP-309 doesn't allow to specify backlog to be false?
> Is this mentioned in FLIP-309? This is completely different from what I
>

Can you explain what you mean by "allow to specify backlog to be false"?

If what you mean is that "can invoke setIsProcessingBacklog(false)", then
FLIP-309 supports doing this.

If what you mean is that "any invocation of setIsProcessingBacklog(false)
will set the job's backlog status to false", then FLIP-309 does not support
this. I believe the existing Java doc of this API and FLIP-309 is
compatible with this explanation.

Note that we can not simply enforce the semantics of "any invocation of
setIsProcessingBacklog(false) will set the job's backlog status to false".
Suppose we have a job with two operators, where operatorA invokes
setIsProcessingBacklog(false) and operatorB invokes
setIsProcessingBacklog(true). There will be conflict if we use the
semantics of "any invocation of setIsProcessingBacklog(false) will set the
job's backlog status to false".

Would this answer your question?

Best,
Dong


> understand. From the API interface "ctx.setIsProcessingBacklog(boolean)",
> it allows users to invoke "setIsProcessingBacklog(false)". And FLIP-309
> also says "MySQL CDC source should report isProcessingBacklog=false
> at the beginning of the changelog stage." If not, maybe we need to revisit
> FLIP-309.


> Best,
> Jark
>
>
>
> On Fri, 15 Sept 2023 at 08:41, Dong Lin  wrote:
>
> > Hi Jark,
> >
> > Do you have any follow-up comment?
> >
> > My gut feeling is that suppose we need to support per-source watermark
> lag
> > specification in the future (not sure we have a use-case for this right
> > now), we can add such a config in the future with a follow-up FLIP. The
> > job-level config will still be useful as it makes users' configuration
> > simpler for common scenarios.
> >
> > If it is OK, can we agree to make incremental progress for Flink and
> start
> > a voting thread for this FLIP?
> >
> > Thanks,
> > Dong
> >
> >
> > On Mon, Sep 11, 2023 at 4:41 PM Jark Wu  wrote:
> >
> > > Hi Dong,
> > >
> > > Please see my comments inline.
> > >
> > > >  As a result, the proposed job-level
> > > > config will be applied only in the changelog stage. So there is no
> > > > difference between these two approaches in this particular case,
> right?
> > >
> > > How the job-level config can be applied ONLY in the changelog stage?
> > > I think it is only possible if it is implemented by the CDC source
> > itself,
> > > because the framework doesn't know which stage of the source is.
> > > Know that the CDC source may emit watermarks with a very small lag
> > > in the snapshot stage, and the job-level config may turn the backlog
> > > status into false.
> > >
> > > > On the other hand, per-source config will be necessary if users want
> to
> > > > apply different watermark lag thresholds for different sources in the
> > > same
> > > > job.
> > >
> > > We also have different watermark delay definitions for each source,
> > > I think this's also reasonable and necessary to have different
> watermark
> > > lags.
> > >
> > >
> > > > Each source can have its own rule that specifies when the backlog can
> > be
> > > true
> > > > (e.g. MySql CDC says the backlog should be true during the snapshot
> > > stage).
> > > > And we can have a job-level config that specifies when the backlog
> > should
> > > > be true. Note that it is designed in such a way that none of these
> > rules
> > > > specify when the backlog should be 

Re: [DISCUSS] FLIP-328: Allow source operators to determine isProcessingBacklog based on watermark lag

2023-09-14 Thread Jark Wu
t; >>>>>>>>>>
> > > > > > >>>>>>>>>> Regarding the limitations of the watermark-based
> > solution:
> > > > > > >>>>>>>>>>
> > > > > > >>>>>>>>>> 1. The current solution can support jobs with sources
> > that
> > > > > > >> have
> > > > > > >>>> event
> > > > > > >>>>>>>>>> time. Users can always define a watermark at the
> source
> > > > > > >> operator,
> > > > > > >>>> even
> > > > > > >>>>>>>> if
> > > > > > >>>>>>>>>> it's not used by downstream operators, such as
> streaming
> > > > join
> > > > > > >> and
> > > > > > >>>>>>>> unbounded
> > > > > > >>>>>>>>>> aggregate.
> > > > > > >>>>>>>>>>
> > > > > > >>>>>>>>>> 2.I don't believe it's accurate to say that the
> > watermark
> > > > lag
> > > > > > >> will
> > > > > > >>>>>> keep
> > > > > > >>>>>>>>>> increasing if no data is generated in Kafka. The
> > watermark
> > > > > > >> lag and
> > > > > > >>>>>>>> backlog
> > > > > > >>>>>>>>>> status are determined at the moment when the watermark
> > is
> > > > > > >> emitted
> > > > > > >>>> to
> > > > > > >>>>>> the
> > > > > > >>>>>>>>>> downstream operator. If no data is emitted from the
> > > source,
> > > > > > >> the
> > > > > > >>>>>>>> watermark
> > > > > > >>>>>>>>>> lag and backlog status will not be updated. If the
> > > > > > >>>> WatermarkStrategy
> > > > > > >>>>>>>> with
> > > > > > >>>>>>>>>> idleness is used, the source becomes non-backlog when
> it
> > > > > > >> becomes
> > > > > > >>>> idle.
> > > > > > >>>>>>>>>>
> > > > > > >>>>>>>>>> 3. I think watermark lag is more intuitive to
> determine
> > > if a
> > > > > > >> job
> > > > > > >>>> is
> > > > > > >>>>>>>>>> processing backlog data. Even when using pending
> > records,
> > > it
> > > > > > >>>> faces a
> > > > > > >>>>>>>>>> similar issue. For example, if the source has 1K
> pending
> > > > > > >> records,
> > > > > > >>>>>> those
> > > > > > >>>>>>>>>> records can span from 1 day  to 1 hour to 1 second. If
> > the
> > > > > > >> records
> > > > > > >>>>>> span
> > > > > > >>>>>>>> 1
> > > > > > >>>>>>>>>> day, it's probably best to optimize for throughput. If
> > > they
> > > > > > >> span 1
> > > > > > >>>>>>>> hour, it
> > > > > > >>>>>>>>>> depends on the business logic. If they span 1 second,
> > > > > > >> optimizing
> > > > > > >>>> for
> > > > > > >>>>>>>>>> latency is likely the better choice.
> > > > > > >>>>>>>>>>
> > > > > > >>>>>>>>>> In summary, I believe the watermark-based solution is
> a
> > > > > > >> superior
> > > > > > >>>>>> choice
> > > > > > >>>>>>>>>> for the target use case where watermark

Re: [DISCUSS] FLIP-328: Allow source operators to determine isProcessingBacklog based on watermark lag

2023-09-14 Thread Dong Lin
r,
> > > > > >>>> even
> > > > > >>>>>>>> if
> > > > > >>>>>>>>>> it's not used by downstream operators, such as streaming
> > > join
> > > > > >> and
> > > > > >>>>>>>> unbounded
> > > > > >>>>>>>>>> aggregate.
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>> 2.I don't believe it's accurate to say that the
> watermark
> > > lag
> > > > > >> will
> > > > > >>>>>> keep
> > > > > >>>>>>>>>> increasing if no data is generated in Kafka. The
> watermark
> > > > > >> lag and
> > > > > >>>>>>>> backlog
> > > > > >>>>>>>>>> status are determined at the moment when the watermark
> is
> > > > > >> emitted
> > > > > >>>> to
> > > > > >>>>>> the
> > > > > >>>>>>>>>> downstream operator. If no data is emitted from the
> > source,
> > > > > >> the
> > > > > >>>>>>>> watermark
> > > > > >>>>>>>>>> lag and backlog status will not be updated. If the
> > > > > >>>> WatermarkStrategy
> > > > > >>>>>>>> with
> > > > > >>>>>>>>>> idleness is used, the source becomes non-backlog when it
> > > > > >> becomes
> > > > > >>>> idle.
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>> 3. I think watermark lag is more intuitive to determine
> > if a
> > > > > >> job
> > > > > >>>> is
> > > > > >>>>>>>>>> processing backlog data. Even when using pending
> records,
> > it
> > > > > >>>> faces a
> > > > > >>>>>>>>>> similar issue. For example, if the source has 1K pending
> > > > > >> records,
> > > > > >>>>>> those
> > > > > >>>>>>>>>> records can span from 1 day  to 1 hour to 1 second. If
> the
> > > > > >> records
> > > > > >>>>>> span
> > > > > >>>>>>>> 1
> > > > > >>>>>>>>>> day, it's probably best to optimize for throughput. If
> > they
> > > > > >> span 1
> > > > > >>>>>>>> hour, it
> > > > > >>>>>>>>>> depends on the business logic. If they span 1 second,
> > > > > >> optimizing
> > > > > >>>> for
> > > > > >>>>>>>>>> latency is likely the better choice.
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>> In summary, I believe the watermark-based solution is a
> > > > > >> superior
> > > > > >>>>>> choice
> > > > > >>>>>>>>>> for the target use case where watermark/event time can
> be
> > > > > >> defined.
> > > > > >>>>>>>>>> Additionally, I haven't come across a scenario that
> > requires
> > > > > >>>>>> low-latency
> > > > > >>>>>>>>>> processing and reads from a source that cannot define
> > > > > >> watermarks.
> > > > > >>>> If
> > > > > >>>>>> we
> > > > > >>>>>>>>>> encounter such a use case, we can create another FLIP to
> > > > > >> address
> > > > > >>>> those
> > > > > >>>>>>>>>> needs in the future. What do you think?
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>> Best,
> > > > > >>>>>>>>>> Xuannan

Re: [DISCUSS] FLIP-328: Allow source operators to determine isProcessingBacklog based on watermark lag

2023-09-12 Thread Dong Lin
>>> status are determined at the moment when the watermark
> is
> > > > > >> emitted
> > > > > >>>> to
> > > > > >>>>>> the
> > > > > >>>>>>>>>> downstream operator. If no data is emitted from the
> > source,
> > > > > >> the
> > > > > >>>>>>>> watermark
> > > > > >>>>>>>>>> lag and backlog status will not be updated. If the
> > > > > >>>> WatermarkStrategy
> > > > > >>>>>>>> with
> > > > > >>>>>>>>>> idleness is used, the source becomes non-backlog when it
> > > > > >> becomes
> > > > > >>>> idle.
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>> 3. I think watermark lag is more intuitive to determine
> > if a
> > > > > >> job
> > > > > >>>> is
> > > > > >>>>>>>>>> processing backlog data. Even when using pending
> records,
> > it
> > > > > >>>> faces a
> > > > > >>>>>>>>>> similar issue. For example, if the source has 1K pending
> > > > > >> records,
> > > > > >>>>>> those
> > > > > >>>>>>>>>> records can span from 1 day  to 1 hour to 1 second. If
> the
> > > > > >> records
> > > > > >>>>>> span
> > > > > >>>>>>>> 1
> > > > > >>>>>>>>>> day, it's probably best to optimize for throughput. If
> > they
> > > > > >> span 1
> > > > > >>>>>>>> hour, it
> > > > > >>>>>>>>>> depends on the business logic. If they span 1 second,
> > > > > >> optimizing
> > > > > >>>> for
> > > > > >>>>>>>>>> latency is likely the better choice.
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>> In summary, I believe the watermark-based solution is a
> > > > > >> superior
> > > > > >>>>>> choice
> > > > > >>>>>>>>>> for the target use case where watermark/event time can
> be
> > > > > >> defined.
> > > > > >>>>>>>>>> Additionally, I haven't come across a scenario that
> > requires
> > > > > >>>>>> low-latency
> > > > > >>>>>>>>>> processing and reads from a source that cannot define
> > > > > >> watermarks.
> > > > > >>>> If
> > > > > >>>>>> we
> > > > > >>>>>>>>>> encounter such a use case, we can create another FLIP to
> > > > > >> address
> > > > > >>>> those
> > > > > >>>>>>>>>> needs in the future. What do you think?
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>> Best,
> > > > > >>>>>>>>>> Xuannan
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>>> On Aug 20, 2023, at 23:27, Jark Wu  > > > > >>  > > > > >>>>>>>>>> imj...@gmail.com>> wrote:
> > > > > >>>>>>>>>>>
> > > > > >>>>>>>>>>> Hi Xuannan,
> > > > > >>>>>>>>>>>
> > > > > >

Re: [DISCUSS] FLIP-328: Allow source operators to determine isProcessingBacklog based on watermark lag

2023-09-11 Thread Jark Wu
> > > > >>>>>>>> latency.
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> Regarding the limitations of the watermark-based solution:
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> 1. The current solution can support jobs with sources that
> > > > >> have
> > > > >>>> event
> > > > >>>>>>>>>> time. Users can always define a watermark at the source
> > > > >> operator,
> > > > >>>> even
> > > > >>>>>>>> if
> > > > >>>>>>>>>> it's not used by downstream operators, such as streaming
> > join
> > > > >> and
> > > > >>>>>>>> unbounded
> > > > >>>>>>>>>> aggregate.
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> 2.I don't believe it's accurate to say that the watermark
> > lag
> > > > >> will
> > > > >>>>>> keep
> > > > >>>>>>>>>> increasing if no data is generated in Kafka. The watermark
> > > > >> lag and
> > > > >>>>>>>> backlog
> > > > >>>>>>>>>> status are determined at the moment when the watermark is
> > > > >> emitted
> > > > >>>> to
> > > > >>>>>> the
> > > > >>>>>>>>>> downstream operator. If no data is emitted from the
> source,
> > > > >> the
> > > > >>>>>>>> watermark
> > > > >>>>>>>>>> lag and backlog status will not be updated. If the
> > > > >>>> WatermarkStrategy
> > > > >>>>>>>> with
> > > > >>>>>>>>>> idleness is used, the source becomes non-backlog when it
> > > > >> becomes
> > > > >>>> idle.
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> 3. I think watermark lag is more intuitive to determine
> if a
> > > > >> job
> > > > >>>> is
> > > > >>>>>>>>>> processing backlog data. Even when using pending records,
> it
> > > > >>>> faces a
> > > > >>>>>>>>>> similar issue. For example, if the source has 1K pending
> > > > >> records,
> > > > >>>>>> those
> > > > >>>>>>>>>> records can span from 1 day  to 1 hour to 1 second. If the
> > > > >> records
> > > > >>>>>> span
> > > > >>>>>>>> 1
> > > > >>>>>>>>>> day, it's probably best to optimize for throughput. If
> they
> > > > >> span 1
> > > > >>>>>>>> hour, it
> > > > >>>>>>>>>> depends on the business logic. If they span 1 second,
> > > > >> optimizing
> > > > >>>> for
> > > > >>>>>>>>>> latency is likely the better choice.
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> In summary, I believe the watermark-based solution is a
> > > > >> superior
> > > > >>>>>> choice
> > > > >>>>>>>>>> for the target use case where watermark/event time can be
> > > > >> defined.
> > > > >>>>>>>>>> Additionally, I haven't come across a scenario that
> requires
> > > > >>>>>> low-latency
> > > > >>>>>>>>>> processing and reads from a source that cannot define
> > > > >> watermarks.
> > > > >>>> If
> > > > >>>>>> we
> > > > >>>>>>>>>> encounter such a use case, we can create another FLIP to
> > > > >> address
> > > > >>>> those
> > > > >>>>>>>>>> needs in the future. What do you think?
> > > > >>>&

Re: [DISCUSS] FLIP-328: Allow source operators to determine isProcessingBacklog based on watermark lag

2023-09-10 Thread Dong Lin
;>> determine
> > > >>>>>>>>>>> "isProcessingBacklog".
> > > >>>>>>>>>>> From my point of view, there are 3 limitations of the
> current
> > > >>>>>> proposal:
> > > >>>>>>>>>>> 1. It doesn't cover jobs that don't have
> watermark/event-time
> > > >>>>>> defined,
> > > >>>>>>>>>>> for example streaming join and unbounded aggregate. We may
> > > >> still
> > > >>>> need
> > > >>>>>>>> to
> > > >>>>>>>>>>> figure out solutions for them.
> > > >>>>>>>>>>> 2. Watermark lag can not be trusted, because it increases
> > > >>>> unlimited
> > > >>>>>> if
> > > >>>>>>>> no
> > > >>>>>>>>>>> data is generated in the Kafka.
> > > >>>>>>>>>>> But in this case, there is no backlog at all.
> > > >>>>>>>>>>> 3. Watermark lag is hard to reflect the amount of backlog.
> > > >> If the
> > > >>>>>>>>>> watermark
> > > >>>>>>>>>>> lag is 1day or 1 hour or 1second,
> > > >>>>>>>>>>> there is possibly only 1 pending record there, which means
> no
> > > >>>> backlog
> > > >>>>>>>> at
> > > >>>>>>>>>>> all.
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> Therefore, IMO, watermark maybe not the ideal metric used
> to
> > > >>>>>> determine
> > > >>>>>>>>>>> "isProcessingBacklog".
> > > >>>>>>>>>>> What we need is something that reflects the number of
> records
> > > >>>>>>>> unprocessed
> > > >>>>>>>>>>> by the job.
> > > >>>>>>>>>>> Actually, that is the "pendingRec

Re: [DISCUSS] FLIP-328: Allow source operators to determine isProcessingBacklog based on watermark lag

2023-09-09 Thread Jark Wu
mple, if the source has 1K pending
> > >> records,
> > >>>>>> those
> > >>>>>>>>>> records can span from 1 day  to 1 hour to 1 second. If the
> > >> records
> > >>>>>> span
> > >>>>>>>> 1
> > >>>>>>>>>> day, it's probably best to optimize for throughput. If they
> > >> span 1
> > >>>>>>>> hour, it
> > >>>>>>>>>> depends on the business logic. If they span 1 second,
> > >> optimizing
> > >>>> for
> > >>>>>>>>>> latency is likely the better choice.
> > >>>>>>>>>>
> > >>>>>>>>>> In summary, I believe the watermark-based solution is a
> > >> superior
> > >>>>>> choice
> > >>>>>>>>>> for the target use case where watermark/event time can be
> > >> defined.
> > >>>>>>>>>> Additionally, I haven't come across a scenario that requires
> > >>>>>> low-latency
> > >>>>>>>>>> processing and reads from a source that cannot define
> > >> watermarks.
> > >>>> If
> > >>>>>> we
> > >>>>>>>>>> encounter such a use case, we can create another FLIP to
> > >> address
> > >>>> those
> > >>>>>>>>>> needs in the future. What do you think?
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>> Best,
> > >>>>>>>>>> Xuannan
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>>> On Aug 20, 2023, at 23:27, Jark Wu  > >>  > >>>>>>>>>> imj...@gmail.com>> wrote:
> > >>>>>>>>>>>
> > >>>>>>>>>>> Hi Xuannan,
> > >>>>>>>>>>>
> > >>>>>>>>>>> Thanks for opening this discussion.
> > >>>>>>>>>>>
> > >>>>>>>>>>> This current proposal may work in the mentioned watermark
> > >> cases.
> > >>>>>>>>>>> However, it seems this is not a general solution for sources
> > >> to
> > >>>>>>>> determine
> > >>>>>>>>>>> "isProcessingBacklog".
> > >>>>>>>>>>> From my point of view, there are 3 limitations of the current
> > >>>>>> proposal:
> > >>>>>>>>>>> 1. It doesn't cover jobs that don't have watermark/event-time
> > >>>>>> defined,
> > >>>>>>>>>>> for example streaming join and unbounded aggregate. We may
> > >> still
> > >>>> need
> > >>>>>>>> to
> > >>>>>>>>>>> figure out solutions for them.
> > >>>>>>>>>>> 2. Watermark lag can not be trusted, because it increases
> > >>>> unlimited
> > >>>>>> if
> > >>>>>>>> no
> > >>>>>>>>>>> data is generated in the Kafka.
> > >>>>>>>>>>> But in this case, there is no backlog at all.
> > >>>>>>>>>>> 3. Watermark lag is hard to reflect the amount of backlog.
> > >> If the
> > >>>>>>>>>> watermark
> > >>>>>>>>>>> lag is 1day or 1 hour or 1second,
> > >>>>>>>>>>> there is possibly only 1 pending record there, which means no
> > >>>> backlog
> > >>>>>>>> at
> > >>>>>>>>>>> all.
> > >>>>>>>>>>>
> > >>>>>>>>>>> Therefore, IMO, watermark maybe not the ideal metric used to
> > >>>>>> determine
> > >>>>>>>>>>> "isProcessingBacklog".
> > >>>>>&

Re: [DISCUSS] FLIP-328: Allow source operators to determine isProcessingBacklog based on watermark lag

2023-09-08 Thread Xuannan Su
t;>>> there is possibly only 1 pending record there, which means no
> >>>> backlog
> >>>>>>>> at
> >>>>>>>>>>> all.
> >>>>>>>>>>>
> >>>>>>>>>>> Therefore, IMO, watermark maybe not the ideal metric used to
> >>>>>> determine
> >>>>>>>>>>> "isProcessingBacklog".
> >>>>>>>>>>> What we need is something that reflects the number of records
> >>>>>>>> unprocessed
> >>>>>>>>>>> by the job.
> >>>>>>>>>>> Actually, that is the "pendingRecords" metric proposed in
> >>>> FLIP-33 and
> >>>>>>>> has
> >>>>>>>>>>> been implemented by Kafka source.
> >>>>>>>>>>> Did you consider using "pendingRecords" metric to determine
> >>>>>>>>>>> "isProcessingBacklog"?
> >>>>>>>>>>>
> >>>>>>>>>>> Best,
> >>>>>>>>>>> Jark
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> [1]
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>
> >>>>>>
> >>>>
> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics
> >>>>>>>>>> <
> >>>>>>>>>>
> >>>>>>>>
> >>>>>>
> >>>>
> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> On Tue, 15 Aug 2023 at 12:04, Xintong Song <
> >>>> tonysong...@gmail.com
> >>>>>>>>>> <mailto:tonysong...@gmail.com>> wrote:
> >>>>>>>>>>>
> >>>>>>>>>>>> Sounds good to me.
> >>>>>>>>>>>>
> >>>>>>>>>>>> It is true that, if we are introducing the generalized
> >>>> watermark,
> >>>>>>>> there
> >>>>>>>>>>>> will be other watermark related concepts / configurations
> >> that
> >>>> need
> >>>>>> to
> >>>>>>>>>> be
> >>>>>>>>>>>> updated anyway.
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> Best,
> >>>>>>>>>>>>
> >>>>>>>>>>>> Xintong
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> On Tue, Aug 15, 2023 at 11:30 AM Xuannan Su <
> >>>> suxuanna...@gmail.com
> >>>>>>>>>> <mailto:suxuanna...@gmail.com>> wrote:
> >>>>>>>>>>>>
> >>>>>>>>>>>>> Hi Xingtong,
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Thank you for your suggestion.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> After considering the idea of using a general configuration
> >>>> key, I
> >>>>>>>>>> think
> >>>>>>>>>>>>> it may not be a good idea for the reasons below.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> While I agree that using a more general configuration key
> >>>> provides
> >>>>>> us
> >>>>>>>>>>>> with
> >>>>>>>>>>>>> the flexibility to switch to other approaches to calculate
> >> the
> >>>> lag
> >>>>>> in
> >>>>>>>>>> the
> >>>>>>>>>>>>> future, the downside is that it may cause confusion for
> >> users.
> >>>> We
> >>>>>>>>>>>> currently
> >>>>>>>>>>>>> have fetchEventTimeLag, emitEventTimeLag, and watermarkLag
> >> in
> >>>> the
> >>>>>>>>>> source,
> >>>>>>>>>>>>> and it is not clear which specific lag we are referring to.
> >>>> With
> >>>>>> the
> >>>>>>>>>>>>> potential introduction of the Generalized Watermark
> >> mechanism
> >>>> in
> >>>>>> the
> >>>>>>>>>>>>> future, if I understand correctly, a watermark won't
> >>>> necessarily
> >>>>>> need
> >>>>>>>>>> to
> >>>>>>>>>>>> be
> >>>>>>>>>>>>> a timestamp. I am concern that the general configuration
> >> key
> >>>> may
> >>>>>> not
> >>>>>>>>>> be
> >>>>>>>>>>>>> enough to cover all the use case and we will need to
> >> introduce
> >>>> a
> >>>>>>>>>> general
> >>>>>>>>>>>>> way to determine the backlog status regardless.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> For the reasons above, I prefer introducing the
> >> configuration
> >>>> as
> >>>>>> is,
> >>>>>>>>>> and
> >>>>>>>>>>>>> change it later with the a deprecation process or migration
> >>>>>> process.
> >>>>>>>>>> What
> >>>>>>>>>>>>> do you think?
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Best,
> >>>>>>>>>>>>> Xuannan
> >>>>>>>>>>>>> On Aug 14, 2023, 14:09 +0800, Xintong Song <
> >>>> tonysong...@gmail.com
> >>>>>>>>>> <mailto:tonysong...@gmail.com>>,
> >>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>> Thanks for the explanation.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> I wonder if it makes sense to not expose this detail via
> >> the
> >>>>>>>>>>>>> configuration
> >>>>>>>>>>>>>> option. To be specific, I suggest not mentioning the
> >>>> "watermark"
> >>>>>>>>>>>> keyword
> >>>>>>>>>>>>> in
> >>>>>>>>>>>>>> the configuration key and description.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> - From the users' perspective, I think they only need to
> >> know
> >>>>>>>> there's
> >>>>>>>>>> a
> >>>>>>>>>>>>>> lag higher than the given threshold, Flink will consider
> >>>> latency
> >>>>>> of
> >>>>>>>>>>>>>> individual records as less important and prioritize
> >> throughput
> >>>>>> over
> >>>>>>>>>> it.
> >>>>>>>>>>>>>> They don't really need the details of how the lags are
> >>>> calculated.
> >>>>>>>>>>>>>> - For the internal implementation, I also think using
> >>>> watermark
> >>>>>> lags
> >>>>>>>>>> is
> >>>>>>>>>>>>>> a good idea, for the reasons you've already mentioned.
> >>>> However,
> >>>>>> it's
> >>>>>>>>>>>> not
> >>>>>>>>>>>>>> the only possible option. Hiding this detail from users
> >> would
> >>>> give
> >>>>>>>> us
> >>>>>>>>>>>> the
> >>>>>>>>>>>>>> flexibility to switch to other approaches if needed in
> >> future.
> >>>>>>>>>>>>>> - We are currently working on designing the
> >> ProcessFunction
> >>>> API
> >>>>>>>>>>>>>> (consider it as a DataStream API V2). There's an idea to
> >>>>>> introduce a
> >>>>>>>>>>>>>> Generalized Watermark mechanism, where basically the
> >>>> watermark can
> >>>>>>>> be
> >>>>>>>>>>>>>> anything that needs to travel along the data-flow with
> >> certain
> >>>>>>>>>>>> alignment
> >>>>>>>>>>>>>> strategies, and event time watermark would be one specific
> >>>> case of
> >>>>>>>> it.
> >>>>>>>>>>>>> This
> >>>>>>>>>>>>>> is still an idea and has not been discussed and agreed on
> >> by
> >>>> the
> >>>>>>>>>>>>> community,
> >>>>>>>>>>>>>> and we are preparing a FLIP for it. But if we are going
> >> for
> >>>> it,
> >>>>>> the
> >>>>>>>>>>>>> concept
> >>>>>>>>>>>>>> "watermark-lag-threshold" could be ambiguous.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> I do not intend to block the FLIP on this. I'd also be
> >> fine
> >>>> with
> >>>>>>>>>>>>>> introducing the configuration as is, and changing it
> >> later, if
> >>>>>>>> needed,
> >>>>>>>>>>>>> with
> >>>>>>>>>>>>>> a regular deprecation and migration process. Just making
> >> my
> >>>>>>>>>>>> suggestions.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Best,
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Xintong
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> On Mon, Aug 14, 2023 at 12:00 PM Xuannan Su <
> >>>>>> suxuanna...@gmail.com
> >>>>>>>>>> <mailto:suxuanna...@gmail.com>>
> >>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Hi Xintong,
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Thanks for the reply.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> I have considered using the timestamp in the records to
> >>>> determine
> >>>>>>>> the
> >>>>>>>>>>>>>>> backlog status, and decided to use watermark at the end.
> >> By
> >>>>>>>>>>>> definition,
> >>>>>>>>>>>>>>> watermark is the time progress indication in the data
> >>>> stream. It
> >>>>>>>>>>>>> indicates
> >>>>>>>>>>>>>>> the stream’s event time has progressed to some specific
> >>>> time. On
> >>>>>>>> the
> >>>>>>>>>>>>> other
> >>>>>>>>>>>>>>> hand, timestamp in the records is usually used to
> >> generate
> >>>> the
> >>>>>>>>>>>>> watermark.
> >>>>>>>>>>>>>>> Therefore, it appears more appropriate and intuitive to
> >>>> calculate
> >>>>>>>> the
> >>>>>>>>>>>>> event
> >>>>>>>>>>>>>>> time lag by watermark and determine the backlog status.
> >> And
> >>>> by
> >>>>>>>> using
> >>>>>>>>>>>>> the
> >>>>>>>>>>>>>>> watermark, we can easily deal with the out-of-order and
> >> the
> >>>>>>>> idleness
> >>>>>>>>>>>>> of the
> >>>>>>>>>>>>>>> data.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Please let me know if you have further questions.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Best,
> >>>>>>>>>>>>>>> Xuannan
> >>>>>>>>>>>>>>> On Aug 10, 2023, 20:23 +0800, Xintong Song <
> >>>>>> tonysong...@gmail.com
> >>>>>>>>>> <mailto:tonysong...@gmail.com>>,
> >>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>> Thanks for preparing the FLIP, Xuannan.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> +1 in general.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> A quick question, could you explain why we are relying
> >> on
> >>>> the
> >>>>>>>>>>>>> watermark
> >>>>>>>>>>>>>>> for
> >>>>>>>>>>>>>>>> emitting the record attribute? Why not use timestamps
> >> in the
> >>>>>>>>>>>>> records? I
> >>>>>>>>>>>>>>>> don't see any concern in using watermarks. Just
> >> wondering if
> >>>>>>>>>>>> there's
> >>>>>>>>>>>>> any
> >>>>>>>>>>>>>>>> deep considerations behind this.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Best,
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Xintong
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> On Thu, Aug 3, 2023 at 3:03 PM Xuannan Su <
> >>>>>> suxuanna...@gmail.com
> >>>>>>>>>> <mailto:suxuanna...@gmail.com>>
> >>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Hi all,
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> I am opening this thread to discuss FLIP-328: Allow
> >> source
> >>>>>>>>>>>>> operators to
> >>>>>>>>>>>>>>>>> determine isProcessingBacklog based on watermark
> >> lag[1]. We
> >>>>>> had a
> >>>>>>>>>>>>>>> several
> >>>>>>>>>>>>>>>>> discussions with Dong Ling about the design, and thanks
> >>>> for all
> >>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>> valuable advice.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> The FLIP aims to target the use-case where user want to
> >>>> run a
> >>>>>>>>>>>> Flink
> >>>>>>>>>>>>>>> job to
> >>>>>>>>>>>>>>>>> backfill historical data in a high throughput manner
> >> and
> >>>>>> continue
> >>>>>>>>>>>>>>>>> processing real-time data with low latency. Building
> >> upon
> >>>> the
> >>>>>>>>>>>>> backlog
> >>>>>>>>>>>>>>>>> concept introduced in FLIP-309[2], this proposal
> >> enables
> >>>>>> sources
> >>>>>>>>>>>> to
> >>>>>>>>>>>>>>> report
> >>>>>>>>>>>>>>>>> their status of processing backlog based on the
> >> watermark
> >>>> lag.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> We would greatly appreciate any comments or feedback
> >> you
> >>>> may
> >>>>>> have
> >>>>>>>>>>>>> on
> >>>>>>>>>>>>>>> this
> >>>>>>>>>>>>>>>>> proposal.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Best,
> >>>>>>>>>>>>>>>>> Xuannan
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> [1]
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>
> >>>>>>
> >>>>
> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-328%3A+Allow+source+operators+to+determine+isProcessingBacklog+based+on+watermark+lag
> >>>>>>>>>> <
> >>>>>>>>>>
> >>>>>>>>
> >>>>>>
> >>>>
> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-328%3A+Allow+source+operators+to+determine+isProcessingBacklog+based+on+watermark+lag
> >>>>>>>>>>>
> >>>>>>>>>>>>>>>>> [2]
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>
> >>>>>>
> >>>>
> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-309%3A+Support+using+larger+checkpointing+interval+when+source+is+processing+backlog
> >>>>>>>>>> <
> >>>>>>>>>>
> >>>>>>>>
> >>>>>>
> >>>>
> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-309%3A+Support+using+larger+checkpointing+interval+when+source+is+processing+backlog
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>
> >>>>>>
> >>>>
> >>
>


Re: [DISCUSS] FLIP-328: Allow source operators to determine isProcessingBacklog based on watermark lag

2023-09-07 Thread Leonard Xu
>>> watermark
>>>>>>>>>>> lag is 1day or 1 hour or 1second,
>>>>>>>>>>> there is possibly only 1 pending record there, which means no
>>>> backlog
>>>>>>>> at
>>>>>>>>>>> all.
>>>>>>>>>>> 
>>>>>>>>>>> Therefore, IMO, watermark maybe not the ideal metric used to
>>>>>> determine
>>>>>>>>>>> "isProcessingBacklog".
>>>>>>>>>>> What we need is something that reflects the number of records
>>>>>>>> unprocessed
>>>>>>>>>>> by the job.
>>>>>>>>>>> Actually, that is the "pendingRecords" metric proposed in
>>>> FLIP-33 and
>>>>>>>> has
>>>>>>>>>>> been implemented by Kafka source.
>>>>>>>>>>> Did you consider using "pendingRecords" metric to determine
>>>>>>>>>>> "isProcessingBacklog"?
>>>>>>>>>>> 
>>>>>>>>>>> Best,
>>>>>>>>>>> Jark
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> [1]
>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>> 
>>>>>> 
>>>> 
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics
>>>>>>>>>> <
>>>>>>>>>> 
>>>>>>>> 
>>>>>> 
>>>> 
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> On Tue, 15 Aug 2023 at 12:04, Xintong Song <
>>>> tonysong...@gmail.com
>>>>>>>>>> <mailto:tonysong...@gmail.com>> wrote:
>>>>>>>>>>> 
>>>>>>>>>>>> Sounds good to me.
>>>>>>>>>>>> 
>>>>>>>>>>>> It is true that, if we are introducing the generalized
>>>> watermark,
>>>>>>>> there
>>>>>>>>>>>> will be other watermark related concepts / configurations
>> that
>>>> need
>>>>>> to
>>>>>>>>>> be
>>>>>>>>>>>> updated anyway.
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>>> Best,
>>>>>>>>>>>> 
>>>>>>>>>>>> Xintong
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>>> On Tue, Aug 15, 2023 at 11:30 AM Xuannan Su <
>>>> suxuanna...@gmail.com
>>>>>>>>>> <mailto:suxuanna...@gmail.com>> wrote:
>>>>>>>>>>>> 
>>>>>>>>>>>>> Hi Xingtong,
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Thank you for your suggestion.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> After considering the idea of using a general configuration
>>>> key, I
>>>>>>>>>> think
>>>>>>>>>>>>> it may not be a good idea for the reasons below.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> While I agree that using a more general configuration key
>>>> provides
>>>>>> us
>>>>>>>>>>>> with
>>>>>>>>>>>>> the flexibility to switch to other approaches to calculate
>> the
>>>> lag
>>>>>> in
>>>>>>>>>> the
>>>>>>>>>>>>> future, the downside is that it may cause confusion for
>> users.
>>>> We
>>>>>>>>>>>> currently
>>>>>>>>>>>>> have fetchEventTimeLag, emitEventTimeLag, and watermarkLag
>> in
>>>> the
>>>>>>>>>> source,
>>>>>>>>>>>>> and it is not clear which specific lag we are referring to.
>>>> With
>>>>>> the
>>>>>>>>>>>>> potential introduction of the Generalized Watermark
>> mechanism
>>>> in
>>>>>> the
>>>>>>>>>>>>> future, if I understand correctly, a watermark won't
>>>> necessarily
>>>>>> need
>>>>>>>>>> to
>>>>>>>>>>>> be
>>>>>>>>>>>>> a timestamp. I am concern that the general configuration
>> key
>>>> may
>>>>>> not
>>>>>>>>>> be
>>>>>>>>>>>>> enough to cover all the use case and we will need to
>> introduce
>>>> a
>>>>>>>>>> general
>>>>>>>>>>>>> way to determine the backlog status regardless.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> For the reasons above, I prefer introducing the
>> configuration
>>>> as
>>>>>> is,
>>>>>>>>>> and
>>>>>>>>>>>>> change it later with the a deprecation process or migration
>>>>>> process.
>>>>>>>>>> What
>>>>>>>>>>>>> do you think?
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Best,
>>>>>>>>>>>>> Xuannan
>>>>>>>>>>>>> On Aug 14, 2023, 14:09 +0800, Xintong Song <
>>>> tonysong...@gmail.com
>>>>>>>>>> <mailto:tonysong...@gmail.com>>,
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>> Thanks for the explanation.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> I wonder if it makes sense to not expose this detail via
>> the
>>>>>>>>>>>>> configuration
>>>>>>>>>>>>>> option. To be specific, I suggest not mentioning the
>>>> "watermark"
>>>>>>>>>>>> keyword
>>>>>>>>>>>>> in
>>>>>>>>>>>>>> the configuration key and description.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> - From the users' perspective, I think they only need to
>> know
>>>>>>>> there's
>>>>>>>>>> a
>>>>>>>>>>>>>> lag higher than the given threshold, Flink will consider
>>>> latency
>>>>>> of
>>>>>>>>>>>>>> individual records as less important and prioritize
>> throughput
>>>>>> over
>>>>>>>>>> it.
>>>>>>>>>>>>>> They don't really need the details of how the lags are
>>>> calculated.
>>>>>>>>>>>>>> - For the internal implementation, I also think using
>>>> watermark
>>>>>> lags
>>>>>>>>>> is
>>>>>>>>>>>>>> a good idea, for the reasons you've already mentioned.
>>>> However,
>>>>>> it's
>>>>>>>>>>>> not
>>>>>>>>>>>>>> the only possible option. Hiding this detail from users
>> would
>>>> give
>>>>>>>> us
>>>>>>>>>>>> the
>>>>>>>>>>>>>> flexibility to switch to other approaches if needed in
>> future.
>>>>>>>>>>>>>> - We are currently working on designing the
>> ProcessFunction
>>>> API
>>>>>>>>>>>>>> (consider it as a DataStream API V2). There's an idea to
>>>>>> introduce a
>>>>>>>>>>>>>> Generalized Watermark mechanism, where basically the
>>>> watermark can
>>>>>>>> be
>>>>>>>>>>>>>> anything that needs to travel along the data-flow with
>> certain
>>>>>>>>>>>> alignment
>>>>>>>>>>>>>> strategies, and event time watermark would be one specific
>>>> case of
>>>>>>>> it.
>>>>>>>>>>>>> This
>>>>>>>>>>>>>> is still an idea and has not been discussed and agreed on
>> by
>>>> the
>>>>>>>>>>>>> community,
>>>>>>>>>>>>>> and we are preparing a FLIP for it. But if we are going
>> for
>>>> it,
>>>>>> the
>>>>>>>>>>>>> concept
>>>>>>>>>>>>>> "watermark-lag-threshold" could be ambiguous.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> I do not intend to block the FLIP on this. I'd also be
>> fine
>>>> with
>>>>>>>>>>>>>> introducing the configuration as is, and changing it
>> later, if
>>>>>>>> needed,
>>>>>>>>>>>>> with
>>>>>>>>>>>>>> a regular deprecation and migration process. Just making
>> my
>>>>>>>>>>>> suggestions.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Xintong
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> On Mon, Aug 14, 2023 at 12:00 PM Xuannan Su <
>>>>>> suxuanna...@gmail.com
>>>>>>>>>> <mailto:suxuanna...@gmail.com>>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Hi Xintong,
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Thanks for the reply.
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> I have considered using the timestamp in the records to
>>>> determine
>>>>>>>> the
>>>>>>>>>>>>>>> backlog status, and decided to use watermark at the end.
>> By
>>>>>>>>>>>> definition,
>>>>>>>>>>>>>>> watermark is the time progress indication in the data
>>>> stream. It
>>>>>>>>>>>>> indicates
>>>>>>>>>>>>>>> the stream’s event time has progressed to some specific
>>>> time. On
>>>>>>>> the
>>>>>>>>>>>>> other
>>>>>>>>>>>>>>> hand, timestamp in the records is usually used to
>> generate
>>>> the
>>>>>>>>>>>>> watermark.
>>>>>>>>>>>>>>> Therefore, it appears more appropriate and intuitive to
>>>> calculate
>>>>>>>> the
>>>>>>>>>>>>> event
>>>>>>>>>>>>>>> time lag by watermark and determine the backlog status.
>> And
>>>> by
>>>>>>>> using
>>>>>>>>>>>>> the
>>>>>>>>>>>>>>> watermark, we can easily deal with the out-of-order and
>> the
>>>>>>>> idleness
>>>>>>>>>>>>> of the
>>>>>>>>>>>>>>> data.
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Please let me know if you have further questions.
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>> Xuannan
>>>>>>>>>>>>>>> On Aug 10, 2023, 20:23 +0800, Xintong Song <
>>>>>> tonysong...@gmail.com
>>>>>>>>>> <mailto:tonysong...@gmail.com>>,
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>> Thanks for preparing the FLIP, Xuannan.
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> +1 in general.
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> A quick question, could you explain why we are relying
>> on
>>>> the
>>>>>>>>>>>>> watermark
>>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>> emitting the record attribute? Why not use timestamps
>> in the
>>>>>>>>>>>>> records? I
>>>>>>>>>>>>>>>> don't see any concern in using watermarks. Just
>> wondering if
>>>>>>>>>>>> there's
>>>>>>>>>>>>> any
>>>>>>>>>>>>>>>> deep considerations behind this.
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> Xintong
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> On Thu, Aug 3, 2023 at 3:03 PM Xuannan Su <
>>>>>> suxuanna...@gmail.com
>>>>>>>>>> <mailto:suxuanna...@gmail.com>>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> Hi all,
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> I am opening this thread to discuss FLIP-328: Allow
>> source
>>>>>>>>>>>>> operators to
>>>>>>>>>>>>>>>>> determine isProcessingBacklog based on watermark
>> lag[1]. We
>>>>>> had a
>>>>>>>>>>>>>>> several
>>>>>>>>>>>>>>>>> discussions with Dong Ling about the design, and thanks
>>>> for all
>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>> valuable advice.
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> The FLIP aims to target the use-case where user want to
>>>> run a
>>>>>>>>>>>> Flink
>>>>>>>>>>>>>>> job to
>>>>>>>>>>>>>>>>> backfill historical data in a high throughput manner
>> and
>>>>>> continue
>>>>>>>>>>>>>>>>> processing real-time data with low latency. Building
>> upon
>>>> the
>>>>>>>>>>>>> backlog
>>>>>>>>>>>>>>>>> concept introduced in FLIP-309[2], this proposal
>> enables
>>>>>> sources
>>>>>>>>>>>> to
>>>>>>>>>>>>>>> report
>>>>>>>>>>>>>>>>> their status of processing backlog based on the
>> watermark
>>>> lag.
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> We would greatly appreciate any comments or feedback
>> you
>>>> may
>>>>>> have
>>>>>>>>>>>>> on
>>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>>> proposal.
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>>> Xuannan
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> [1]
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>> 
>>>>>> 
>>>> 
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-328%3A+Allow+source+operators+to+determine+isProcessingBacklog+based+on+watermark+lag
>>>>>>>>>> <
>>>>>>>>>> 
>>>>>>>> 
>>>>>> 
>>>> 
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-328%3A+Allow+source+operators+to+determine+isProcessingBacklog+based+on+watermark+lag
>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> [2]
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>> 
>>>>>> 
>>>> 
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-309%3A+Support+using+larger+checkpointing+interval+when+source+is+processing+backlog
>>>>>>>>>> <
>>>>>>>>>> 
>>>>>>>> 
>>>>>> 
>>>> 
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-309%3A+Support+using+larger+checkpointing+interval+when+source+is+processing+backlog
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>> 
>>>>>> 
>>>> 
>> 



Re: [DISCUSS] FLIP-328: Allow source operators to determine isProcessingBacklog based on watermark lag

2023-09-07 Thread Jark Wu
 lag is 1day or 1 hour or 1second,
> > > > > >>>>> there is possibly only 1 pending record there, which means no
> > > backlog
> > > > > >> at
> > > > > >>>>> all.
> > > > > >>>>>
> > > > > >>>>> Therefore, IMO, watermark maybe not the ideal metric used to
> > > > > determine
> > > > > >>>>> "isProcessingBacklog".
> > > > > >>>>> What we need is something that reflects the number of records
> > > > > >> unprocessed
> > > > > >>>>> by the job.
> > > > > >>>>> Actually, that is the "pendingRecords" metric proposed in
> > > FLIP-33 and
> > > > > >> has
> > > > > >>>>> been implemented by Kafka source.
> > > > > >>>>> Did you consider using "pendingRecords" metric to determine
> > > > > >>>>> "isProcessingBacklog"?
> > > > > >>>>>
> > > > > >>>>> Best,
> > > > > >>>>> Jark
> > > > > >>>>>
> > > > > >>>>>
> > > > > >>>>> [1]
> > > > > >>>>>
> > > > > >>>>
> > > > > >>
> > > > >
> > >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics
> > > > > >>>> <
> > > > > >>>>
> > > > > >>
> > > > >
> > >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics
> > > > > >>>>>
> > > > > >>>>>
> > > > > >>>>>
> > > > > >>>>>
> > > > > >>>>> On Tue, 15 Aug 2023 at 12:04, Xintong Song <
> > > tonysong...@gmail.com
> > > > > >>>> <mailto:tonysong...@gmail.com>> wrote:
> > > > > >>>>>
> > > > > >>>>>> Sounds good to me.
> > > > > >>>>>>
> > > > > >>>>>> It is true that, if we are introducing the generalized
> > > watermark,
> > > > > >> there
> > > > > >>>>>> will be other watermark related concepts / configurations
> that
> > > need
> > > > > to
> > > > > >>>> be
> > > > > >>>>>> updated anyway.
> > > > > >>>>>>
> > > > > >>>>>>
> > > > > >>>>>> Best,
> > > > > >>>>>>
> > > > > >>>>>> Xintong
> > > > > >>>>>>
> > > > > >>>>>>
> > > > > >>>>>>
> > > > > >>>>>> On Tue, Aug 15, 2023 at 11:30 AM Xuannan Su <
> > > suxuanna...@gmail.com
> > > > > >>>> <mailto:suxuanna...@gmail.com>> wrote:
> > > > > >>>>>>
> > > > > >>>>>>> Hi Xingtong,
> > > > > >>>>>>>
> > > > > >>>>>>> Thank you for your suggestion.
> > > > > >>>>>>>
> > > > > >>>>>>> After considering the idea of using a general configuration
> > > key, I
> > > > > >>>> think
> > > > > >>>>>>> it may not be a good idea for the reasons below.
> > > > > >>>>>>>
> > > > > >>>>>>> While I agree that using a more general configuration key
> > > provides
> > > > > us
> > > > > >>>>>> with
> > > > > >>>>>>> the flexibility to switch to other approaches to calculate
> the
> > > lag
> > > > > in
> > > > > >>>> the
> > > > > >>>>>>> future, the downside is that it may cause confusion for
> users.
> > > We
> > > > > >>>>>> currently
> > > > > >>>>>>> have fetchEve

Re: [DISCUSS] FLIP-328: Allow source operators to determine isProcessingBacklog based on watermark lag

2023-09-06 Thread Xuannan Su
;>>
> > > > >>>>>>> After considering the idea of using a general configuration
> > key, I
> > > > >>>> think
> > > > >>>>>>> it may not be a good idea for the reasons below.
> > > > >>>>>>>
> > > > >>>>>>> While I agree that using a more general configuration key
> > provides
> > > > us
> > > > >>>>>> with
> > > > >>>>>>> the flexibility to switch to other approaches to calculate the
> > lag
> > > > in
> > > > >>>> the
> > > > >>>>>>> future, the downside is that it may cause confusion for users.
> > We
> > > > >>>>>> currently
> > > > >>>>>>> have fetchEventTimeLag, emitEventTimeLag, and watermarkLag in
> > the
> > > > >>>> source,
> > > > >>>>>>> and it is not clear which specific lag we are referring to.
> > With
> > > > the
> > > > >>>>>>> potential introduction of the Generalized Watermark mechanism
> > in
> > > > the
> > > > >>>>>>> future, if I understand correctly, a watermark won't
> > necessarily
> > > > need
> > > > >>>> to
> > > > >>>>>> be
> > > > >>>>>>> a timestamp. I am concern that the general configuration key
> > may
> > > > not
> > > > >>>> be
> > > > >>>>>>> enough to cover all the use case and we will need to introduce
> > a
> > > > >>>> general
> > > > >>>>>>> way to determine the backlog status regardless.
> > > > >>>>>>>
> > > > >>>>>>> For the reasons above, I prefer introducing the configuration
> > as
> > > > is,
> > > > >>>> and
> > > > >>>>>>> change it later with the a deprecation process or migration
> > > > process.
> > > > >>>> What
> > > > >>>>>>> do you think?
> > > > >>>>>>>
> > > > >>>>>>> Best,
> > > > >>>>>>> Xuannan
> > > > >>>>>>> On Aug 14, 2023, 14:09 +0800, Xintong Song <
> > tonysong...@gmail.com
> > > > >>>> <mailto:tonysong...@gmail.com>>,
> > > > >>>>>> wrote:
> > > > >>>>>>>> Thanks for the explanation.
> > > > >>>>>>>>
> > > > >>>>>>>> I wonder if it makes sense to not expose this detail via the
> > > > >>>>>>> configuration
> > > > >>>>>>>> option. To be specific, I suggest not mentioning the
> > "watermark"
> > > > >>>>>> keyword
> > > > >>>>>>> in
> > > > >>>>>>>> the configuration key and description.
> > > > >>>>>>>>
> > > > >>>>>>>> - From the users' perspective, I think they only need to know
> > > > >> there's
> > > > >>>> a
> > > > >>>>>>>> lag higher than the given threshold, Flink will consider
> > latency
> > > > of
> > > > >>>>>>>> individual records as less important and prioritize throughput
> > > > over
> > > > >>>> it.
> > > > >>>>>>>> They don't really need the details of how the lags are
> > calculated.
> > > > >>>>>>>> - For the internal implementation, I also think using
> > watermark
> > > > lags
> > > > >>>> is
> > > > >>>>>>>> a good idea, for the reasons you've already mentioned.
> > However,
> > > > it's
> > > > >>>>>> not
> > > > >>>>>>>> the only possible option. Hiding this detail from users would
> > give
> > > > >> us
> > > > >>>>>> the
> > > > >>>>>>>> flexibility to switch to other approaches if needed in future.
> > > > >>>>>>>> - We are currently working on designing the ProcessFunction
> > API
> > > > >>>>>>>> (consider it as a DataStream API V2). There's an idea to
> > > > introduce a
> > > > >>>>>>>> Generalized Watermark mechanism, where basically the
> > watermark can
> > > > >> be
> > > > >>>>>>>> anything that needs to travel along the data-flow with certain
> > > > >>>>>> alignment
> > > > >>>>>>>> strategies, and event time watermark would be one specific
> > case of
> > > > >> it.
> > > > >>>>>>> This
> > > > >>>>>>>> is still an idea and has not been discussed and agreed on by
> > the
> > > > >>>>>>> community,
> > > > >>>>>>>> and we are preparing a FLIP for it. But if we are going for
> > it,
> > > > the
> > > > >>>>>>> concept
> > > > >>>>>>>> "watermark-lag-threshold" could be ambiguous.
> > > > >>>>>>>>
> > > > >>>>>>>> I do not intend to block the FLIP on this. I'd also be fine
> > with
> > > > >>>>>>>> introducing the configuration as is, and changing it later, if
> > > > >> needed,
> > > > >>>>>>> with
> > > > >>>>>>>> a regular deprecation and migration process. Just making my
> > > > >>>>>> suggestions.
> > > > >>>>>>>>
> > > > >>>>>>>>
> > > > >>>>>>>> Best,
> > > > >>>>>>>>
> > > > >>>>>>>> Xintong
> > > > >>>>>>>>
> > > > >>>>>>>>
> > > > >>>>>>>>
> > > > >>>>>>>> On Mon, Aug 14, 2023 at 12:00 PM Xuannan Su <
> > > > suxuanna...@gmail.com
> > > > >>>> <mailto:suxuanna...@gmail.com>>
> > > > >>>>>>> wrote:
> > > > >>>>>>>>
> > > > >>>>>>>>> Hi Xintong,
> > > > >>>>>>>>>
> > > > >>>>>>>>> Thanks for the reply.
> > > > >>>>>>>>>
> > > > >>>>>>>>> I have considered using the timestamp in the records to
> > determine
> > > > >> the
> > > > >>>>>>>>> backlog status, and decided to use watermark at the end. By
> > > > >>>>>> definition,
> > > > >>>>>>>>> watermark is the time progress indication in the data
> > stream. It
> > > > >>>>>>> indicates
> > > > >>>>>>>>> the stream’s event time has progressed to some specific
> > time. On
> > > > >> the
> > > > >>>>>>> other
> > > > >>>>>>>>> hand, timestamp in the records is usually used to generate
> > the
> > > > >>>>>>> watermark.
> > > > >>>>>>>>> Therefore, it appears more appropriate and intuitive to
> > calculate
> > > > >> the
> > > > >>>>>>> event
> > > > >>>>>>>>> time lag by watermark and determine the backlog status. And
> > by
> > > > >> using
> > > > >>>>>>> the
> > > > >>>>>>>>> watermark, we can easily deal with the out-of-order and the
> > > > >> idleness
> > > > >>>>>>> of the
> > > > >>>>>>>>> data.
> > > > >>>>>>>>>
> > > > >>>>>>>>> Please let me know if you have further questions.
> > > > >>>>>>>>>
> > > > >>>>>>>>> Best,
> > > > >>>>>>>>> Xuannan
> > > > >>>>>>>>> On Aug 10, 2023, 20:23 +0800, Xintong Song <
> > > > tonysong...@gmail.com
> > > > >>>> <mailto:tonysong...@gmail.com>>,
> > > > >>>>>>> wrote:
> > > > >>>>>>>>>> Thanks for preparing the FLIP, Xuannan.
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> +1 in general.
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> A quick question, could you explain why we are relying on
> > the
> > > > >>>>>>> watermark
> > > > >>>>>>>>> for
> > > > >>>>>>>>>> emitting the record attribute? Why not use timestamps in the
> > > > >>>>>>> records? I
> > > > >>>>>>>>>> don't see any concern in using watermarks. Just wondering if
> > > > >>>>>> there's
> > > > >>>>>>> any
> > > > >>>>>>>>>> deep considerations behind this.
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> Best,
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> Xintong
> > > > >>>>>>>>>>
> > > > >>>>>>>>>>
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> On Thu, Aug 3, 2023 at 3:03 PM Xuannan Su <
> > > > suxuanna...@gmail.com
> > > > >>>> <mailto:suxuanna...@gmail.com>>
> > > > >>>>>>> wrote:
> > > > >>>>>>>>>>
> > > > >>>>>>>>>>> Hi all,
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>> I am opening this thread to discuss FLIP-328: Allow source
> > > > >>>>>>> operators to
> > > > >>>>>>>>>>> determine isProcessingBacklog based on watermark lag[1]. We
> > > > had a
> > > > >>>>>>>>> several
> > > > >>>>>>>>>>> discussions with Dong Ling about the design, and thanks
> > for all
> > > > >>>>>> the
> > > > >>>>>>>>>>> valuable advice.
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>> The FLIP aims to target the use-case where user want to
> > run a
> > > > >>>>>> Flink
> > > > >>>>>>>>> job to
> > > > >>>>>>>>>>> backfill historical data in a high throughput manner and
> > > > continue
> > > > >>>>>>>>>>> processing real-time data with low latency. Building upon
> > the
> > > > >>>>>>> backlog
> > > > >>>>>>>>>>> concept introduced in FLIP-309[2], this proposal enables
> > > > sources
> > > > >>>>>> to
> > > > >>>>>>>>> report
> > > > >>>>>>>>>>> their status of processing backlog based on the watermark
> > lag.
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>> We would greatly appreciate any comments or feedback you
> > may
> > > > have
> > > > >>>>>>> on
> > > > >>>>>>>>> this
> > > > >>>>>>>>>>> proposal.
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>> Best,
> > > > >>>>>>>>>>> Xuannan
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>> [1]
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>
> > > > >>>>>>>
> > > > >>>>>>
> > > > >>>>
> > > > >>
> > > >
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-328%3A+Allow+source+operators+to+determine+isProcessingBacklog+based+on+watermark+lag
> > > > >>>> <
> > > > >>>>
> > > > >>
> > > >
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-328%3A+Allow+source+operators+to+determine+isProcessingBacklog+based+on+watermark+lag
> > > > >>>>>
> > > > >>>>>>>>>>> [2]
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>
> > > > >>>>>>>
> > > > >>>>>>
> > > > >>>>
> > > > >>
> > > >
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-309%3A+Support+using+larger+checkpointing+interval+when+source+is+processing+backlog
> > > > >>>> <
> > > > >>>>
> > > > >>
> > > >
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-309%3A+Support+using+larger+checkpointing+interval+when+source+is+processing+backlog
> > > > >>
> > > > >>
> > > > >>
> > > >
> > > >
> >


Re: [DISCUSS] FLIP-328: Allow source operators to determine isProcessingBacklog based on watermark lag

2023-09-06 Thread Jing Ge
t;>> What we need is something that reflects the number of records
> > > >> unprocessed
> > > >>>>> by the job.
> > > >>>>> Actually, that is the "pendingRecords" metric proposed in
> FLIP-33 and
> > > >> has
> > > >>>>> been implemented by Kafka source.
> > > >>>>> Did you consider using "pendingRecords" metric to determine
> > > >>>>> "isProcessingBacklog"?
> > > >>>>>
> > > >>>>> Best,
> > > >>>>> Jark
> > > >>>>>
> > > >>>>>
> > > >>>>> [1]
> > > >>>>>
> > > >>>>
> > > >>
> > >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics
> > > >>>> <
> > > >>>>
> > > >>
> > >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics
> > > >>>>>
> > > >>>>>
> > > >>>>>
> > > >>>>>
> > > >>>>> On Tue, 15 Aug 2023 at 12:04, Xintong Song <
> tonysong...@gmail.com
> > > >>>> <mailto:tonysong...@gmail.com>> wrote:
> > > >>>>>
> > > >>>>>> Sounds good to me.
> > > >>>>>>
> > > >>>>>> It is true that, if we are introducing the generalized
> watermark,
> > > >> there
> > > >>>>>> will be other watermark related concepts / configurations that
> need
> > > to
> > > >>>> be
> > > >>>>>> updated anyway.
> > > >>>>>>
> > > >>>>>>
> > > >>>>>> Best,
> > > >>>>>>
> > > >>>>>> Xintong
> > > >>>>>>
> > > >>>>>>
> > > >>>>>>
> > > >>>>>> On Tue, Aug 15, 2023 at 11:30 AM Xuannan Su <
> suxuanna...@gmail.com
> > > >>>> <mailto:suxuanna...@gmail.com>> wrote:
> > > >>>>>>
> > > >>>>>>> Hi Xingtong,
> > > >>>>>>>
> > > >>>>>>> Thank you for your suggestion.
> > > >>>>>>>
> > > >>>>>>> After considering the idea of using a general configuration
> key, I
> > > >>>> think
> > > >>>>>>> it may not be a good idea for the reasons below.
> > > >>>>>>>
> > > >>>>>>> While I agree that using a more general configuration key
> provides
> > > us
> > > >>>>>> with
> > > >>>>>>> the flexibility to switch to other approaches to calculate the
> lag
> > > in
> > > >>>> the
> > > >>>>>>> future, the downside is that it may cause confusion for users.
> We
> > > >>>>>> currently
> > > >>>>>>> have fetchEventTimeLag, emitEventTimeLag, and watermarkLag in
> the
> > > >>>> source,
> > > >>>>>>> and it is not clear which specific lag we are referring to.
> With
> > > the
> > > >>>>>>> potential introduction of the Generalized Watermark mechanism
> in
> > > the
> > > >>>>>>> future, if I understand correctly, a watermark won't
> necessarily
> > > need
> > > >>>> to
> > > >>>>>> be
> > > >>>>>>> a timestamp. I am concern that the general configuration key
> may
> > > not
> > > >>>> be
> > > >>>>>>> enough to cover all the use case and we will need to introduce
> a
> > > >>>> general
> > > >>>>>>> way to determine the backlog status regardless.
> > > >>>>>>>
> > > >>>>>>> For the reasons above, I prefer introducing the configuration
> as
> > > is,
> > > >>>> and
> > > >>>>>>> change it later with the a depre

Re: [DISCUSS] FLIP-328: Allow source operators to determine isProcessingBacklog based on watermark lag

2023-09-06 Thread Xuannan Su
; >>>>>>
> > >>>>>>
> > >>>>>> Best,
> > >>>>>>
> > >>>>>> Xintong
> > >>>>>>
> > >>>>>>
> > >>>>>>
> > >>>>>> On Tue, Aug 15, 2023 at 11:30 AM Xuannan Su  > >>>> <mailto:suxuanna...@gmail.com>> wrote:
> > >>>>>>
> > >>>>>>> Hi Xingtong,
> > >>>>>>>
> > >>>>>>> Thank you for your suggestion.
> > >>>>>>>
> > >>>>>>> After considering the idea of using a general configuration key, I
> > >>>> think
> > >>>>>>> it may not be a good idea for the reasons below.
> > >>>>>>>
> > >>>>>>> While I agree that using a more general configuration key provides
> > us
> > >>>>>> with
> > >>>>>>> the flexibility to switch to other approaches to calculate the lag
> > in
> > >>>> the
> > >>>>>>> future, the downside is that it may cause confusion for users. We
> > >>>>>> currently
> > >>>>>>> have fetchEventTimeLag, emitEventTimeLag, and watermarkLag in the
> > >>>> source,
> > >>>>>>> and it is not clear which specific lag we are referring to. With
> > the
> > >>>>>>> potential introduction of the Generalized Watermark mechanism in
> > the
> > >>>>>>> future, if I understand correctly, a watermark won't necessarily
> > need
> > >>>> to
> > >>>>>> be
> > >>>>>>> a timestamp. I am concern that the general configuration key may
> > not
> > >>>> be
> > >>>>>>> enough to cover all the use case and we will need to introduce a
> > >>>> general
> > >>>>>>> way to determine the backlog status regardless.
> > >>>>>>>
> > >>>>>>> For the reasons above, I prefer introducing the configuration as
> > is,
> > >>>> and
> > >>>>>>> change it later with the a deprecation process or migration
> > process.
> > >>>> What
> > >>>>>>> do you think?
> > >>>>>>>
> > >>>>>>> Best,
> > >>>>>>> Xuannan
> > >>>>>>> On Aug 14, 2023, 14:09 +0800, Xintong Song  > >>>> <mailto:tonysong...@gmail.com>>,
> > >>>>>> wrote:
> > >>>>>>>> Thanks for the explanation.
> > >>>>>>>>
> > >>>>>>>> I wonder if it makes sense to not expose this detail via the
> > >>>>>>> configuration
> > >>>>>>>> option. To be specific, I suggest not mentioning the "watermark"
> > >>>>>> keyword
> > >>>>>>> in
> > >>>>>>>> the configuration key and description.
> > >>>>>>>>
> > >>>>>>>> - From the users' perspective, I think they only need to know
> > >> there's
> > >>>> a
> > >>>>>>>> lag higher than the given threshold, Flink will consider latency
> > of
> > >>>>>>>> individual records as less important and prioritize throughput
> > over
> > >>>> it.
> > >>>>>>>> They don't really need the details of how the lags are calculated.
> > >>>>>>>> - For the internal implementation, I also think using watermark
> > lags
> > >>>> is
> > >>>>>>>> a good idea, for the reasons you've already mentioned. However,
> > it's
> > >>>>>> not
> > >>>>>>>> the only possible option. Hiding this detail from users would give
> > >> us
> > >>>>>> the
> > >>>>>>>> flexibility to switch to other approaches if needed in future.
> > >>>>>>>> - We are currently working on designing the ProcessFunction API
> > >>>>>>>> (consider it as a DataStream API V2). There's an idea to
> > introduce a
> > >>>>>>>> Generalized Watermark mechanism, where basically the watermark can
> > >> be
> > >>>>>>>> anything that needs to travel along the data-flow with certain
> > >>>>>> alignment
> > >>>>>>>> strategies, and event time watermark would be one specific case of
> > >> it.
> > >>>>>>> This
> > >>>>>>>> is still an idea and has not been discussed and agreed on by the
> > >>>>>>> community,
> > >>>>>>>> and we are preparing a FLIP for it. But if we are going for it,
> > the
> > >>>>>>> concept
> > >>>>>>>> "watermark-lag-threshold" could be ambiguous.
> > >>>>>>>>
> > >>>>>>>> I do not intend to block the FLIP on this. I'd also be fine with
> > >>>>>>>> introducing the configuration as is, and changing it later, if
> > >> needed,
> > >>>>>>> with
> > >>>>>>>> a regular deprecation and migration process. Just making my
> > >>>>>> suggestions.
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>> Best,
> > >>>>>>>>
> > >>>>>>>> Xintong
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>> On Mon, Aug 14, 2023 at 12:00 PM Xuannan Su <
> > suxuanna...@gmail.com
> > >>>> <mailto:suxuanna...@gmail.com>>
> > >>>>>>> wrote:
> > >>>>>>>>
> > >>>>>>>>> Hi Xintong,
> > >>>>>>>>>
> > >>>>>>>>> Thanks for the reply.
> > >>>>>>>>>
> > >>>>>>>>> I have considered using the timestamp in the records to determine
> > >> the
> > >>>>>>>>> backlog status, and decided to use watermark at the end. By
> > >>>>>> definition,
> > >>>>>>>>> watermark is the time progress indication in the data stream. It
> > >>>>>>> indicates
> > >>>>>>>>> the stream’s event time has progressed to some specific time. On
> > >> the
> > >>>>>>> other
> > >>>>>>>>> hand, timestamp in the records is usually used to generate the
> > >>>>>>> watermark.
> > >>>>>>>>> Therefore, it appears more appropriate and intuitive to calculate
> > >> the
> > >>>>>>> event
> > >>>>>>>>> time lag by watermark and determine the backlog status. And by
> > >> using
> > >>>>>>> the
> > >>>>>>>>> watermark, we can easily deal with the out-of-order and the
> > >> idleness
> > >>>>>>> of the
> > >>>>>>>>> data.
> > >>>>>>>>>
> > >>>>>>>>> Please let me know if you have further questions.
> > >>>>>>>>>
> > >>>>>>>>> Best,
> > >>>>>>>>> Xuannan
> > >>>>>>>>> On Aug 10, 2023, 20:23 +0800, Xintong Song <
> > tonysong...@gmail.com
> > >>>> <mailto:tonysong...@gmail.com>>,
> > >>>>>>> wrote:
> > >>>>>>>>>> Thanks for preparing the FLIP, Xuannan.
> > >>>>>>>>>>
> > >>>>>>>>>> +1 in general.
> > >>>>>>>>>>
> > >>>>>>>>>> A quick question, could you explain why we are relying on the
> > >>>>>>> watermark
> > >>>>>>>>> for
> > >>>>>>>>>> emitting the record attribute? Why not use timestamps in the
> > >>>>>>> records? I
> > >>>>>>>>>> don't see any concern in using watermarks. Just wondering if
> > >>>>>> there's
> > >>>>>>> any
> > >>>>>>>>>> deep considerations behind this.
> > >>>>>>>>>>
> > >>>>>>>>>> Best,
> > >>>>>>>>>>
> > >>>>>>>>>> Xintong
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>> On Thu, Aug 3, 2023 at 3:03 PM Xuannan Su <
> > suxuanna...@gmail.com
> > >>>> <mailto:suxuanna...@gmail.com>>
> > >>>>>>> wrote:
> > >>>>>>>>>>
> > >>>>>>>>>>> Hi all,
> > >>>>>>>>>>>
> > >>>>>>>>>>> I am opening this thread to discuss FLIP-328: Allow source
> > >>>>>>> operators to
> > >>>>>>>>>>> determine isProcessingBacklog based on watermark lag[1]. We
> > had a
> > >>>>>>>>> several
> > >>>>>>>>>>> discussions with Dong Ling about the design, and thanks for all
> > >>>>>> the
> > >>>>>>>>>>> valuable advice.
> > >>>>>>>>>>>
> > >>>>>>>>>>> The FLIP aims to target the use-case where user want to run a
> > >>>>>> Flink
> > >>>>>>>>> job to
> > >>>>>>>>>>> backfill historical data in a high throughput manner and
> > continue
> > >>>>>>>>>>> processing real-time data with low latency. Building upon the
> > >>>>>>> backlog
> > >>>>>>>>>>> concept introduced in FLIP-309[2], this proposal enables
> > sources
> > >>>>>> to
> > >>>>>>>>> report
> > >>>>>>>>>>> their status of processing backlog based on the watermark lag.
> > >>>>>>>>>>>
> > >>>>>>>>>>> We would greatly appreciate any comments or feedback you may
> > have
> > >>>>>>> on
> > >>>>>>>>> this
> > >>>>>>>>>>> proposal.
> > >>>>>>>>>>>
> > >>>>>>>>>>> Best,
> > >>>>>>>>>>> Xuannan
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>> [1]
> > >>>>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>
> > >>>>>>
> > >>>>
> > >>
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-328%3A+Allow+source+operators+to+determine+isProcessingBacklog+based+on+watermark+lag
> > >>>> <
> > >>>>
> > >>
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-328%3A+Allow+source+operators+to+determine+isProcessingBacklog+based+on+watermark+lag
> > >>>>>
> > >>>>>>>>>>> [2]
> > >>>>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>
> > >>>>>>
> > >>>>
> > >>
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-309%3A+Support+using+larger+checkpointing+interval+when+source+is+processing+backlog
> > >>>> <
> > >>>>
> > >>
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-309%3A+Support+using+larger+checkpointing+interval+when+source+is+processing+backlog
> > >>
> > >>
> > >>
> >
> >


Re: [DISCUSS] FLIP-328: Allow source operators to determine isProcessingBacklog based on watermark lag

2023-09-04 Thread Jing Ge
gt; >>>>>>> way to determine the backlog status regardless.
> >>>>>>>
> >>>>>>> For the reasons above, I prefer introducing the configuration as
> is,
> >>>> and
> >>>>>>> change it later with the a deprecation process or migration
> process.
> >>>> What
> >>>>>>> do you think?
> >>>>>>>
> >>>>>>> Best,
> >>>>>>> Xuannan
> >>>>>>> On Aug 14, 2023, 14:09 +0800, Xintong Song  >>>> <mailto:tonysong...@gmail.com>>,
> >>>>>> wrote:
> >>>>>>>> Thanks for the explanation.
> >>>>>>>>
> >>>>>>>> I wonder if it makes sense to not expose this detail via the
> >>>>>>> configuration
> >>>>>>>> option. To be specific, I suggest not mentioning the "watermark"
> >>>>>> keyword
> >>>>>>> in
> >>>>>>>> the configuration key and description.
> >>>>>>>>
> >>>>>>>> - From the users' perspective, I think they only need to know
> >> there's
> >>>> a
> >>>>>>>> lag higher than the given threshold, Flink will consider latency
> of
> >>>>>>>> individual records as less important and prioritize throughput
> over
> >>>> it.
> >>>>>>>> They don't really need the details of how the lags are calculated.
> >>>>>>>> - For the internal implementation, I also think using watermark
> lags
> >>>> is
> >>>>>>>> a good idea, for the reasons you've already mentioned. However,
> it's
> >>>>>> not
> >>>>>>>> the only possible option. Hiding this detail from users would give
> >> us
> >>>>>> the
> >>>>>>>> flexibility to switch to other approaches if needed in future.
> >>>>>>>> - We are currently working on designing the ProcessFunction API
> >>>>>>>> (consider it as a DataStream API V2). There's an idea to
> introduce a
> >>>>>>>> Generalized Watermark mechanism, where basically the watermark can
> >> be
> >>>>>>>> anything that needs to travel along the data-flow with certain
> >>>>>> alignment
> >>>>>>>> strategies, and event time watermark would be one specific case of
> >> it.
> >>>>>>> This
> >>>>>>>> is still an idea and has not been discussed and agreed on by the
> >>>>>>> community,
> >>>>>>>> and we are preparing a FLIP for it. But if we are going for it,
> the
> >>>>>>> concept
> >>>>>>>> "watermark-lag-threshold" could be ambiguous.
> >>>>>>>>
> >>>>>>>> I do not intend to block the FLIP on this. I'd also be fine with
> >>>>>>>> introducing the configuration as is, and changing it later, if
> >> needed,
> >>>>>>> with
> >>>>>>>> a regular deprecation and migration process. Just making my
> >>>>>> suggestions.
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> Best,
> >>>>>>>>
> >>>>>>>> Xintong
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On Mon, Aug 14, 2023 at 12:00 PM Xuannan Su <
> suxuanna...@gmail.com
> >>>> <mailto:suxuanna...@gmail.com>>
> >>>>>>> wrote:
> >>>>>>>>
> >>>>>>>>> Hi Xintong,
> >>>>>>>>>
> >>>>>>>>> Thanks for the reply.
> >>>>>>>>>
> >>>>>>>>> I have considered using the timestamp in the records to determine
> >> the
> >>>>>>>>> backlog status, and decided to use watermark at the end. By
> >>>>>> definition,
> >>>>>>>>> watermark is the time progress indication in the data stream. It
> >>>>>>> indicates
> >>>>>>>>> the stream’s event time has progressed to some specific time. On
> >> the
> >>>>>>> other
> >>>>>>>>> hand, timestamp in the records is usually used to generate the
> >>>>>>> watermark.
> >>>>>>>>> Therefore, it appears more appropriate and intuitive to calculate
> >> the
> >>>>>>> event
> >>>>>>>>> time lag by watermark and determine the backlog status. And by
> >> using
> >>>>>>> the
> >>>>>>>>> watermark, we can easily deal with the out-of-order and the
> >> idleness
> >>>>>>> of the
> >>>>>>>>> data.
> >>>>>>>>>
> >>>>>>>>> Please let me know if you have further questions.
> >>>>>>>>>
> >>>>>>>>> Best,
> >>>>>>>>> Xuannan
> >>>>>>>>> On Aug 10, 2023, 20:23 +0800, Xintong Song <
> tonysong...@gmail.com
> >>>> <mailto:tonysong...@gmail.com>>,
> >>>>>>> wrote:
> >>>>>>>>>> Thanks for preparing the FLIP, Xuannan.
> >>>>>>>>>>
> >>>>>>>>>> +1 in general.
> >>>>>>>>>>
> >>>>>>>>>> A quick question, could you explain why we are relying on the
> >>>>>>> watermark
> >>>>>>>>> for
> >>>>>>>>>> emitting the record attribute? Why not use timestamps in the
> >>>>>>> records? I
> >>>>>>>>>> don't see any concern in using watermarks. Just wondering if
> >>>>>> there's
> >>>>>>> any
> >>>>>>>>>> deep considerations behind this.
> >>>>>>>>>>
> >>>>>>>>>> Best,
> >>>>>>>>>>
> >>>>>>>>>> Xintong
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> On Thu, Aug 3, 2023 at 3:03 PM Xuannan Su <
> suxuanna...@gmail.com
> >>>> <mailto:suxuanna...@gmail.com>>
> >>>>>>> wrote:
> >>>>>>>>>>
> >>>>>>>>>>> Hi all,
> >>>>>>>>>>>
> >>>>>>>>>>> I am opening this thread to discuss FLIP-328: Allow source
> >>>>>>> operators to
> >>>>>>>>>>> determine isProcessingBacklog based on watermark lag[1]. We
> had a
> >>>>>>>>> several
> >>>>>>>>>>> discussions with Dong Ling about the design, and thanks for all
> >>>>>> the
> >>>>>>>>>>> valuable advice.
> >>>>>>>>>>>
> >>>>>>>>>>> The FLIP aims to target the use-case where user want to run a
> >>>>>> Flink
> >>>>>>>>> job to
> >>>>>>>>>>> backfill historical data in a high throughput manner and
> continue
> >>>>>>>>>>> processing real-time data with low latency. Building upon the
> >>>>>>> backlog
> >>>>>>>>>>> concept introduced in FLIP-309[2], this proposal enables
> sources
> >>>>>> to
> >>>>>>>>> report
> >>>>>>>>>>> their status of processing backlog based on the watermark lag.
> >>>>>>>>>>>
> >>>>>>>>>>> We would greatly appreciate any comments or feedback you may
> have
> >>>>>>> on
> >>>>>>>>> this
> >>>>>>>>>>> proposal.
> >>>>>>>>>>>
> >>>>>>>>>>> Best,
> >>>>>>>>>>> Xuannan
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> [1]
> >>>>>>>>>>>
> >>>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-328%3A+Allow+source+operators+to+determine+isProcessingBacklog+based+on+watermark+lag
> >>>> <
> >>>>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-328%3A+Allow+source+operators+to+determine+isProcessingBacklog+based+on+watermark+lag
> >>>>>
> >>>>>>>>>>> [2]
> >>>>>>>>>>>
> >>>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-309%3A+Support+using+larger+checkpointing+interval+when+source+is+processing+backlog
> >>>> <
> >>>>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-309%3A+Support+using+larger+checkpointing+interval+when+source+is+processing+backlog
> >>
> >>
> >>
>
>


Re: [DISCUSS] FLIP-328: Allow source operators to determine isProcessingBacklog based on watermark lag

2023-08-31 Thread Xuannan Su
;>>
> > >>>>> After considering the idea of using a general configuration key, I
> > >> think
> > >>>>> it may not be a good idea for the reasons below.
> > >>>>>
> > >>>>> While I agree that using a more general configuration key provides us
> > >>>> with
> > >>>>> the flexibility to switch to other approaches to calculate the lag in
> > >> the
> > >>>>> future, the downside is that it may cause confusion for users. We
> > >>>> currently
> > >>>>> have fetchEventTimeLag, emitEventTimeLag, and watermarkLag in the
> > >> source,
> > >>>>> and it is not clear which specific lag we are referring to. With the
> > >>>>> potential introduction of the Generalized Watermark mechanism in the
> > >>>>> future, if I understand correctly, a watermark won't necessarily need
> > >> to
> > >>>> be
> > >>>>> a timestamp. I am concern that the general configuration key may not
> > >> be
> > >>>>> enough to cover all the use case and we will need to introduce a
> > >> general
> > >>>>> way to determine the backlog status regardless.
> > >>>>>
> > >>>>> For the reasons above, I prefer introducing the configuration as is,
> > >> and
> > >>>>> change it later with the a deprecation process or migration process.
> > >> What
> > >>>>> do you think?
> > >>>>>
> > >>>>> Best,
> > >>>>> Xuannan
> > >>>>> On Aug 14, 2023, 14:09 +0800, Xintong Song  > >> <mailto:tonysong...@gmail.com>>,
> > >>>> wrote:
> > >>>>>> Thanks for the explanation.
> > >>>>>>
> > >>>>>> I wonder if it makes sense to not expose this detail via the
> > >>>>> configuration
> > >>>>>> option. To be specific, I suggest not mentioning the "watermark"
> > >>>> keyword
> > >>>>> in
> > >>>>>> the configuration key and description.
> > >>>>>>
> > >>>>>> - From the users' perspective, I think they only need to know
> > there's
> > >> a
> > >>>>>> lag higher than the given threshold, Flink will consider latency of
> > >>>>>> individual records as less important and prioritize throughput over
> > >> it.
> > >>>>>> They don't really need the details of how the lags are calculated.
> > >>>>>> - For the internal implementation, I also think using watermark lags
> > >> is
> > >>>>>> a good idea, for the reasons you've already mentioned. However, it's
> > >>>> not
> > >>>>>> the only possible option. Hiding this detail from users would give
> > us
> > >>>> the
> > >>>>>> flexibility to switch to other approaches if needed in future.
> > >>>>>> - We are currently working on designing the ProcessFunction API
> > >>>>>> (consider it as a DataStream API V2). There's an idea to introduce a
> > >>>>>> Generalized Watermark mechanism, where basically the watermark can
> > be
> > >>>>>> anything that needs to travel along the data-flow with certain
> > >>>> alignment
> > >>>>>> strategies, and event time watermark would be one specific case of
> > it.
> > >>>>> This
> > >>>>>> is still an idea and has not been discussed and agreed on by the
> > >>>>> community,
> > >>>>>> and we are preparing a FLIP for it. But if we are going for it, the
> > >>>>> concept
> > >>>>>> "watermark-lag-threshold" could be ambiguous.
> > >>>>>>
> > >>>>>> I do not intend to block the FLIP on this. I'd also be fine with
> > >>>>>> introducing the configuration as is, and changing it later, if
> > needed,
> > >>>>> with
> > >>>>>> a regular deprecation and migration process. Just making my
> > >>>> suggestions.
> > >>>>>>
> > >>>>>>
> > >>>>>> Best,
> > >>>>>>
> > >>>>>> Xintong
> > >>>>>>
> > >>>>>>
> > >>>>>>
> > >>>>>> On Mon, Aug 14, 2023 at 12:00 PM Xuannan Su  > >> <mailto:suxuanna...@gmail.com>>
> > >>>>> wrote:
> > >>>>>>
> > >>>>>>> Hi Xintong,
> > >>>>>>>
> > >>>>>>> Thanks for the reply.
> > >>>>>>>
> > >>>>>>> I have considered using the timestamp in the records to determine
> > the
> > >>>>>>> backlog status, and decided to use watermark at the end. By
> > >>>> definition,
> > >>>>>>> watermark is the time progress indication in the data stream. It
> > >>>>> indicates
> > >>>>>>> the stream’s event time has progressed to some specific time. On
> > the
> > >>>>> other
> > >>>>>>> hand, timestamp in the records is usually used to generate the
> > >>>>> watermark.
> > >>>>>>> Therefore, it appears more appropriate and intuitive to calculate
> > the
> > >>>>> event
> > >>>>>>> time lag by watermark and determine the backlog status. And by
> > using
> > >>>>> the
> > >>>>>>> watermark, we can easily deal with the out-of-order and the
> > idleness
> > >>>>> of the
> > >>>>>>> data.
> > >>>>>>>
> > >>>>>>> Please let me know if you have further questions.
> > >>>>>>>
> > >>>>>>> Best,
> > >>>>>>> Xuannan
> > >>>>>>> On Aug 10, 2023, 20:23 +0800, Xintong Song  > >> <mailto:tonysong...@gmail.com>>,
> > >>>>> wrote:
> > >>>>>>>> Thanks for preparing the FLIP, Xuannan.
> > >>>>>>>>
> > >>>>>>>> +1 in general.
> > >>>>>>>>
> > >>>>>>>> A quick question, could you explain why we are relying on the
> > >>>>> watermark
> > >>>>>>> for
> > >>>>>>>> emitting the record attribute? Why not use timestamps in the
> > >>>>> records? I
> > >>>>>>>> don't see any concern in using watermarks. Just wondering if
> > >>>> there's
> > >>>>> any
> > >>>>>>>> deep considerations behind this.
> > >>>>>>>>
> > >>>>>>>> Best,
> > >>>>>>>>
> > >>>>>>>> Xintong
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>> On Thu, Aug 3, 2023 at 3:03 PM Xuannan Su  > >> <mailto:suxuanna...@gmail.com>>
> > >>>>> wrote:
> > >>>>>>>>
> > >>>>>>>>> Hi all,
> > >>>>>>>>>
> > >>>>>>>>> I am opening this thread to discuss FLIP-328: Allow source
> > >>>>> operators to
> > >>>>>>>>> determine isProcessingBacklog based on watermark lag[1]. We had a
> > >>>>>>> several
> > >>>>>>>>> discussions with Dong Ling about the design, and thanks for all
> > >>>> the
> > >>>>>>>>> valuable advice.
> > >>>>>>>>>
> > >>>>>>>>> The FLIP aims to target the use-case where user want to run a
> > >>>> Flink
> > >>>>>>> job to
> > >>>>>>>>> backfill historical data in a high throughput manner and continue
> > >>>>>>>>> processing real-time data with low latency. Building upon the
> > >>>>> backlog
> > >>>>>>>>> concept introduced in FLIP-309[2], this proposal enables sources
> > >>>> to
> > >>>>>>> report
> > >>>>>>>>> their status of processing backlog based on the watermark lag.
> > >>>>>>>>>
> > >>>>>>>>> We would greatly appreciate any comments or feedback you may have
> > >>>>> on
> > >>>>>>> this
> > >>>>>>>>> proposal.
> > >>>>>>>>>
> > >>>>>>>>> Best,
> > >>>>>>>>> Xuannan
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>> [1]
> > >>>>>>>>>
> > >>>>>>>
> > >>>>>
> > >>>>
> > >>
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-328%3A+Allow+source+operators+to+determine+isProcessingBacklog+based+on+watermark+lag
> > >> <
> > >>
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-328%3A+Allow+source+operators+to+determine+isProcessingBacklog+based+on+watermark+lag
> > >>>
> > >>>>>>>>> [2]
> > >>>>>>>>>
> > >>>>>>>
> > >>>>>
> > >>>>
> > >>
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-309%3A+Support+using+larger+checkpointing+interval+when+source+is+processing+backlog
> > >> <
> > >>
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-309%3A+Support+using+larger+checkpointing+interval+when+source+is+processing+backlog
> >
> >
> >


Re: [DISCUSS] FLIP-328: Allow source operators to determine isProcessingBacklog based on watermark lag

2023-08-30 Thread Xuannan Su
>> has
>>>>> been implemented by Kafka source.
>>>>> Did you consider using "pendingRecords" metric to determine
>>>>> "isProcessingBacklog"?
>>>>> 
>>>>> Best,
>>>>> Jark
>>>>> 
>>>>> 
>>>>> [1]
>>>>> 
>>>> 
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics
>>>> <
>>>> 
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> On Tue, 15 Aug 2023 at 12:04, Xintong Song >>> <mailto:tonysong...@gmail.com>> wrote:
>>>>> 
>>>>>> Sounds good to me.
>>>>>> 
>>>>>> It is true that, if we are introducing the generalized watermark,
>> there
>>>>>> will be other watermark related concepts / configurations that need to
>>>> be
>>>>>> updated anyway.
>>>>>> 
>>>>>> 
>>>>>> Best,
>>>>>> 
>>>>>> Xintong
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> On Tue, Aug 15, 2023 at 11:30 AM Xuannan Su >>> <mailto:suxuanna...@gmail.com>> wrote:
>>>>>> 
>>>>>>> Hi Xingtong,
>>>>>>> 
>>>>>>> Thank you for your suggestion.
>>>>>>> 
>>>>>>> After considering the idea of using a general configuration key, I
>>>> think
>>>>>>> it may not be a good idea for the reasons below.
>>>>>>> 
>>>>>>> While I agree that using a more general configuration key provides us
>>>>>> with
>>>>>>> the flexibility to switch to other approaches to calculate the lag in
>>>> the
>>>>>>> future, the downside is that it may cause confusion for users. We
>>>>>> currently
>>>>>>> have fetchEventTimeLag, emitEventTimeLag, and watermarkLag in the
>>>> source,
>>>>>>> and it is not clear which specific lag we are referring to. With the
>>>>>>> potential introduction of the Generalized Watermark mechanism in the
>>>>>>> future, if I understand correctly, a watermark won't necessarily need
>>>> to
>>>>>> be
>>>>>>> a timestamp. I am concern that the general configuration key may not
>>>> be
>>>>>>> enough to cover all the use case and we will need to introduce a
>>>> general
>>>>>>> way to determine the backlog status regardless.
>>>>>>> 
>>>>>>> For the reasons above, I prefer introducing the configuration as is,
>>>> and
>>>>>>> change it later with the a deprecation process or migration process.
>>>> What
>>>>>>> do you think?
>>>>>>> 
>>>>>>> Best,
>>>>>>> Xuannan
>>>>>>> On Aug 14, 2023, 14:09 +0800, Xintong Song >>> <mailto:tonysong...@gmail.com>>,
>>>>>> wrote:
>>>>>>>> Thanks for the explanation.
>>>>>>>> 
>>>>>>>> I wonder if it makes sense to not expose this detail via the
>>>>>>> configuration
>>>>>>>> option. To be specific, I suggest not mentioning the "watermark"
>>>>>> keyword
>>>>>>> in
>>>>>>>> the configuration key and description.
>>>>>>>> 
>>>>>>>> - From the users' perspective, I think they only need to know
>> there's
>>>> a
>>>>>>>> lag higher than the given threshold, Flink will consider latency of
>>>>>>>> individual records as less important and prioritize throughput over
>>>> it.
>>>>>>>> They don't really need the details of how the lags are calculated.
>>>>>>>> - For the internal implementation, I also think using watermark lags
>>>> is
>>>>>>>> a good idea, for the reasons you've already mentioned. However, it's
>>>>>> not
>>>>>>>> the only possible option. Hiding this detail fro

Re: [DISCUSS] FLIP-328: Allow source operators to determine isProcessingBacklog based on watermark lag

2023-08-30 Thread Jing Ge
gt;> currently
> >>>>> have fetchEventTimeLag, emitEventTimeLag, and watermarkLag in the
> >> source,
> >>>>> and it is not clear which specific lag we are referring to. With the
> >>>>> potential introduction of the Generalized Watermark mechanism in the
> >>>>> future, if I understand correctly, a watermark won't necessarily need
> >> to
> >>>> be
> >>>>> a timestamp. I am concern that the general configuration key may not
> >> be
> >>>>> enough to cover all the use case and we will need to introduce a
> >> general
> >>>>> way to determine the backlog status regardless.
> >>>>>
> >>>>> For the reasons above, I prefer introducing the configuration as is,
> >> and
> >>>>> change it later with the a deprecation process or migration process.
> >> What
> >>>>> do you think?
> >>>>>
> >>>>> Best,
> >>>>> Xuannan
> >>>>> On Aug 14, 2023, 14:09 +0800, Xintong Song  >> <mailto:tonysong...@gmail.com>>,
> >>>> wrote:
> >>>>>> Thanks for the explanation.
> >>>>>>
> >>>>>> I wonder if it makes sense to not expose this detail via the
> >>>>> configuration
> >>>>>> option. To be specific, I suggest not mentioning the "watermark"
> >>>> keyword
> >>>>> in
> >>>>>> the configuration key and description.
> >>>>>>
> >>>>>> - From the users' perspective, I think they only need to know
> there's
> >> a
> >>>>>> lag higher than the given threshold, Flink will consider latency of
> >>>>>> individual records as less important and prioritize throughput over
> >> it.
> >>>>>> They don't really need the details of how the lags are calculated.
> >>>>>> - For the internal implementation, I also think using watermark lags
> >> is
> >>>>>> a good idea, for the reasons you've already mentioned. However, it's
> >>>> not
> >>>>>> the only possible option. Hiding this detail from users would give
> us
> >>>> the
> >>>>>> flexibility to switch to other approaches if needed in future.
> >>>>>> - We are currently working on designing the ProcessFunction API
> >>>>>> (consider it as a DataStream API V2). There's an idea to introduce a
> >>>>>> Generalized Watermark mechanism, where basically the watermark can
> be
> >>>>>> anything that needs to travel along the data-flow with certain
> >>>> alignment
> >>>>>> strategies, and event time watermark would be one specific case of
> it.
> >>>>> This
> >>>>>> is still an idea and has not been discussed and agreed on by the
> >>>>> community,
> >>>>>> and we are preparing a FLIP for it. But if we are going for it, the
> >>>>> concept
> >>>>>> "watermark-lag-threshold" could be ambiguous.
> >>>>>>
> >>>>>> I do not intend to block the FLIP on this. I'd also be fine with
> >>>>>> introducing the configuration as is, and changing it later, if
> needed,
> >>>>> with
> >>>>>> a regular deprecation and migration process. Just making my
> >>>> suggestions.
> >>>>>>
> >>>>>>
> >>>>>> Best,
> >>>>>>
> >>>>>> Xintong
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> On Mon, Aug 14, 2023 at 12:00 PM Xuannan Su  >> <mailto:suxuanna...@gmail.com>>
> >>>>> wrote:
> >>>>>>
> >>>>>>> Hi Xintong,
> >>>>>>>
> >>>>>>> Thanks for the reply.
> >>>>>>>
> >>>>>>> I have considered using the timestamp in the records to determine
> the
> >>>>>>> backlog status, and decided to use watermark at the end. By
> >>>> definition,
> >>>>>>> watermark is the time progress indication in the data stream. It
> >>>>> indicates
> >>>>>>> the stream’s event time has progressed to some specific time. On
> the
> >>>>> other
> >>>>>>> hand, timestamp in the records is usually used to generate the
> >>>>> watermark.
> >>>>>>> Therefore, it appears more appropriate and intuitive to calculate
> the
> >>>>> event
> >>>>>>> time lag by watermark and determine the backlog status. And by
> using
> >>>>> the
> >>>>>>> watermark, we can easily deal with the out-of-order and the
> idleness
> >>>>> of the
> >>>>>>> data.
> >>>>>>>
> >>>>>>> Please let me know if you have further questions.
> >>>>>>>
> >>>>>>> Best,
> >>>>>>> Xuannan
> >>>>>>> On Aug 10, 2023, 20:23 +0800, Xintong Song  >> <mailto:tonysong...@gmail.com>>,
> >>>>> wrote:
> >>>>>>>> Thanks for preparing the FLIP, Xuannan.
> >>>>>>>>
> >>>>>>>> +1 in general.
> >>>>>>>>
> >>>>>>>> A quick question, could you explain why we are relying on the
> >>>>> watermark
> >>>>>>> for
> >>>>>>>> emitting the record attribute? Why not use timestamps in the
> >>>>> records? I
> >>>>>>>> don't see any concern in using watermarks. Just wondering if
> >>>> there's
> >>>>> any
> >>>>>>>> deep considerations behind this.
> >>>>>>>>
> >>>>>>>> Best,
> >>>>>>>>
> >>>>>>>> Xintong
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On Thu, Aug 3, 2023 at 3:03 PM Xuannan Su  >> <mailto:suxuanna...@gmail.com>>
> >>>>> wrote:
> >>>>>>>>
> >>>>>>>>> Hi all,
> >>>>>>>>>
> >>>>>>>>> I am opening this thread to discuss FLIP-328: Allow source
> >>>>> operators to
> >>>>>>>>> determine isProcessingBacklog based on watermark lag[1]. We had a
> >>>>>>> several
> >>>>>>>>> discussions with Dong Ling about the design, and thanks for all
> >>>> the
> >>>>>>>>> valuable advice.
> >>>>>>>>>
> >>>>>>>>> The FLIP aims to target the use-case where user want to run a
> >>>> Flink
> >>>>>>> job to
> >>>>>>>>> backfill historical data in a high throughput manner and continue
> >>>>>>>>> processing real-time data with low latency. Building upon the
> >>>>> backlog
> >>>>>>>>> concept introduced in FLIP-309[2], this proposal enables sources
> >>>> to
> >>>>>>> report
> >>>>>>>>> their status of processing backlog based on the watermark lag.
> >>>>>>>>>
> >>>>>>>>> We would greatly appreciate any comments or feedback you may have
> >>>>> on
> >>>>>>> this
> >>>>>>>>> proposal.
> >>>>>>>>>
> >>>>>>>>> Best,
> >>>>>>>>> Xuannan
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> [1]
> >>>>>>>>>
> >>>>>>>
> >>>>>
> >>>>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-328%3A+Allow+source+operators+to+determine+isProcessingBacklog+based+on+watermark+lag
> >> <
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-328%3A+Allow+source+operators+to+determine+isProcessingBacklog+based+on+watermark+lag
> >>>
> >>>>>>>>> [2]
> >>>>>>>>>
> >>>>>>>
> >>>>>
> >>>>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-309%3A+Support+using+larger+checkpointing+interval+when+source+is+processing+backlog
> >> <
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-309%3A+Support+using+larger+checkpointing+interval+when+source+is+processing+backlog
>
>
>


Re: [DISCUSS] FLIP-328: Allow source operators to determine isProcessingBacklog based on watermark lag

2023-08-30 Thread Hang Ruan
14, 2023, 14:09 +0800, Xintong Song  >> <mailto:tonysong...@gmail.com>>,
> >>>> wrote:
> >>>>>> Thanks for the explanation.
> >>>>>>
> >>>>>> I wonder if it makes sense to not expose this detail via the
> >>>>> configuration
> >>>>>> option. To be specific, I suggest not mentioning the "watermark"
> >>>> keyword
> >>>>> in
> >>>>>> the configuration key and description.
> >>>>>>
> >>>>>> - From the users' perspective, I think they only need to know
> there's
> >> a
> >>>>>> lag higher than the given threshold, Flink will consider latency of
> >>>>>> individual records as less important and prioritize throughput over
> >> it.
> >>>>>> They don't really need the details of how the lags are calculated.
> >>>>>> - For the internal implementation, I also think using watermark lags
> >> is
> >>>>>> a good idea, for the reasons you've already mentioned. However, it's
> >>>> not
> >>>>>> the only possible option. Hiding this detail from users would give
> us
> >>>> the
> >>>>>> flexibility to switch to other approaches if needed in future.
> >>>>>> - We are currently working on designing the ProcessFunction API
> >>>>>> (consider it as a DataStream API V2). There's an idea to introduce a
> >>>>>> Generalized Watermark mechanism, where basically the watermark can
> be
> >>>>>> anything that needs to travel along the data-flow with certain
> >>>> alignment
> >>>>>> strategies, and event time watermark would be one specific case of
> it.
> >>>>> This
> >>>>>> is still an idea and has not been discussed and agreed on by the
> >>>>> community,
> >>>>>> and we are preparing a FLIP for it. But if we are going for it, the
> >>>>> concept
> >>>>>> "watermark-lag-threshold" could be ambiguous.
> >>>>>>
> >>>>>> I do not intend to block the FLIP on this. I'd also be fine with
> >>>>>> introducing the configuration as is, and changing it later, if
> needed,
> >>>>> with
> >>>>>> a regular deprecation and migration process. Just making my
> >>>> suggestions.
> >>>>>>
> >>>>>>
> >>>>>> Best,
> >>>>>>
> >>>>>> Xintong
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> On Mon, Aug 14, 2023 at 12:00 PM Xuannan Su  >> <mailto:suxuanna...@gmail.com>>
> >>>>> wrote:
> >>>>>>
> >>>>>>> Hi Xintong,
> >>>>>>>
> >>>>>>> Thanks for the reply.
> >>>>>>>
> >>>>>>> I have considered using the timestamp in the records to determine
> the
> >>>>>>> backlog status, and decided to use watermark at the end. By
> >>>> definition,
> >>>>>>> watermark is the time progress indication in the data stream. It
> >>>>> indicates
> >>>>>>> the stream’s event time has progressed to some specific time. On
> the
> >>>>> other
> >>>>>>> hand, timestamp in the records is usually used to generate the
> >>>>> watermark.
> >>>>>>> Therefore, it appears more appropriate and intuitive to calculate
> the
> >>>>> event
> >>>>>>> time lag by watermark and determine the backlog status. And by
> using
> >>>>> the
> >>>>>>> watermark, we can easily deal with the out-of-order and the
> idleness
> >>>>> of the
> >>>>>>> data.
> >>>>>>>
> >>>>>>> Please let me know if you have further questions.
> >>>>>>>
> >>>>>>> Best,
> >>>>>>> Xuannan
> >>>>>>> On Aug 10, 2023, 20:23 +0800, Xintong Song  >> <mailto:tonysong...@gmail.com>>,
> >>>>> wrote:
> >>>>>>>> Thanks for preparing the FLIP, Xuannan.
> >>>>>>>>
> >>>>>>>> +1 in general.
> >>>>>>>>
> >>>>>>>> A quick question, could you explain why we are relying on the
> >>>>> watermark
> >>>>>>> for
> >>>>>>>> emitting the record attribute? Why not use timestamps in the
> >>>>> records? I
> >>>>>>>> don't see any concern in using watermarks. Just wondering if
> >>>> there's
> >>>>> any
> >>>>>>>> deep considerations behind this.
> >>>>>>>>
> >>>>>>>> Best,
> >>>>>>>>
> >>>>>>>> Xintong
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On Thu, Aug 3, 2023 at 3:03 PM Xuannan Su  >> <mailto:suxuanna...@gmail.com>>
> >>>>> wrote:
> >>>>>>>>
> >>>>>>>>> Hi all,
> >>>>>>>>>
> >>>>>>>>> I am opening this thread to discuss FLIP-328: Allow source
> >>>>> operators to
> >>>>>>>>> determine isProcessingBacklog based on watermark lag[1]. We had a
> >>>>>>> several
> >>>>>>>>> discussions with Dong Ling about the design, and thanks for all
> >>>> the
> >>>>>>>>> valuable advice.
> >>>>>>>>>
> >>>>>>>>> The FLIP aims to target the use-case where user want to run a
> >>>> Flink
> >>>>>>> job to
> >>>>>>>>> backfill historical data in a high throughput manner and continue
> >>>>>>>>> processing real-time data with low latency. Building upon the
> >>>>> backlog
> >>>>>>>>> concept introduced in FLIP-309[2], this proposal enables sources
> >>>> to
> >>>>>>> report
> >>>>>>>>> their status of processing backlog based on the watermark lag.
> >>>>>>>>>
> >>>>>>>>> We would greatly appreciate any comments or feedback you may have
> >>>>> on
> >>>>>>> this
> >>>>>>>>> proposal.
> >>>>>>>>>
> >>>>>>>>> Best,
> >>>>>>>>> Xuannan
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> [1]
> >>>>>>>>>
> >>>>>>>
> >>>>>
> >>>>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-328%3A+Allow+source+operators+to+determine+isProcessingBacklog+based+on+watermark+lag
> >> <
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-328%3A+Allow+source+operators+to+determine+isProcessingBacklog+based+on+watermark+lag
> >>>
> >>>>>>>>> [2]
> >>>>>>>>>
> >>>>>>>
> >>>>>
> >>>>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-309%3A+Support+using+larger+checkpointing+interval+when+source+is+processing+backlog
> >> <
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-309%3A+Support+using+larger+checkpointing+interval+when+source+is+processing+backlog
>
>
>


Re: [DISCUSS] FLIP-328: Allow source operators to determine isProcessingBacklog based on watermark lag

2023-08-29 Thread Xuannan Su
ng the ProcessFunction API
>>>>>> (consider it as a DataStream API V2). There's an idea to introduce a
>>>>>> Generalized Watermark mechanism, where basically the watermark can be
>>>>>> anything that needs to travel along the data-flow with certain
>>>> alignment
>>>>>> strategies, and event time watermark would be one specific case of it.
>>>>> This
>>>>>> is still an idea and has not been discussed and agreed on by the
>>>>> community,
>>>>>> and we are preparing a FLIP for it. But if we are going for it, the
>>>>> concept
>>>>>> "watermark-lag-threshold" could be ambiguous.
>>>>>> 
>>>>>> I do not intend to block the FLIP on this. I'd also be fine with
>>>>>> introducing the configuration as is, and changing it later, if needed,
>>>>> with
>>>>>> a regular deprecation and migration process. Just making my
>>>> suggestions.
>>>>>> 
>>>>>> 
>>>>>> Best,
>>>>>> 
>>>>>> Xintong
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> On Mon, Aug 14, 2023 at 12:00 PM Xuannan Su > <mailto:suxuanna...@gmail.com>>
>>>>> wrote:
>>>>>> 
>>>>>>> Hi Xintong,
>>>>>>> 
>>>>>>> Thanks for the reply.
>>>>>>> 
>>>>>>> I have considered using the timestamp in the records to determine the
>>>>>>> backlog status, and decided to use watermark at the end. By
>>>> definition,
>>>>>>> watermark is the time progress indication in the data stream. It
>>>>> indicates
>>>>>>> the stream’s event time has progressed to some specific time. On the
>>>>> other
>>>>>>> hand, timestamp in the records is usually used to generate the
>>>>> watermark.
>>>>>>> Therefore, it appears more appropriate and intuitive to calculate the
>>>>> event
>>>>>>> time lag by watermark and determine the backlog status. And by using
>>>>> the
>>>>>>> watermark, we can easily deal with the out-of-order and the idleness
>>>>> of the
>>>>>>> data.
>>>>>>> 
>>>>>>> Please let me know if you have further questions.
>>>>>>> 
>>>>>>> Best,
>>>>>>> Xuannan
>>>>>>> On Aug 10, 2023, 20:23 +0800, Xintong Song > <mailto:tonysong...@gmail.com>>,
>>>>> wrote:
>>>>>>>> Thanks for preparing the FLIP, Xuannan.
>>>>>>>> 
>>>>>>>> +1 in general.
>>>>>>>> 
>>>>>>>> A quick question, could you explain why we are relying on the
>>>>> watermark
>>>>>>> for
>>>>>>>> emitting the record attribute? Why not use timestamps in the
>>>>> records? I
>>>>>>>> don't see any concern in using watermarks. Just wondering if
>>>> there's
>>>>> any
>>>>>>>> deep considerations behind this.
>>>>>>>> 
>>>>>>>> Best,
>>>>>>>> 
>>>>>>>> Xintong
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> On Thu, Aug 3, 2023 at 3:03 PM Xuannan Su > <mailto:suxuanna...@gmail.com>>
>>>>> wrote:
>>>>>>>> 
>>>>>>>>> Hi all,
>>>>>>>>> 
>>>>>>>>> I am opening this thread to discuss FLIP-328: Allow source
>>>>> operators to
>>>>>>>>> determine isProcessingBacklog based on watermark lag[1]. We had a
>>>>>>> several
>>>>>>>>> discussions with Dong Ling about the design, and thanks for all
>>>> the
>>>>>>>>> valuable advice.
>>>>>>>>> 
>>>>>>>>> The FLIP aims to target the use-case where user want to run a
>>>> Flink
>>>>>>> job to
>>>>>>>>> backfill historical data in a high throughput manner and continue
>>>>>>>>> processing real-time data with low latency. Building upon the
>>>>> backlog
>>>>>>>>> concept introduced in FLIP-309[2], this proposal enables sources
>>>> to
>>>>>>> report
>>>>>>>>> their status of processing backlog based on the watermark lag.
>>>>>>>>> 
>>>>>>>>> We would greatly appreciate any comments or feedback you may have
>>>>> on
>>>>>>> this
>>>>>>>>> proposal.
>>>>>>>>> 
>>>>>>>>> Best,
>>>>>>>>> Xuannan
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> [1]
>>>>>>>>> 
>>>>>>> 
>>>>> 
>>>> 
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-328%3A+Allow+source+operators+to+determine+isProcessingBacklog+based+on+watermark+lag
>> <
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-328%3A+Allow+source+operators+to+determine+isProcessingBacklog+based+on+watermark+lag
>>> 
>>>>>>>>> [2]
>>>>>>>>> 
>>>>>>> 
>>>>> 
>>>> 
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-309%3A+Support+using+larger+checkpointing+interval+when+source+is+processing+backlog
>> <
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-309%3A+Support+using+larger+checkpointing+interval+when+source+is+processing+backlog




Re: [DISCUSS] FLIP-328: Allow source operators to determine isProcessingBacklog based on watermark lag

2023-08-27 Thread Jing Ge
>>> On Mon, Aug 14, 2023 at 12:00 PM Xuannan Su  <mailto:suxuanna...@gmail.com>>
> >>> wrote:
> >>>>
> >>>>> Hi Xintong,
> >>>>>
> >>>>> Thanks for the reply.
> >>>>>
> >>>>> I have considered using the timestamp in the records to determine the
> >>>>> backlog status, and decided to use watermark at the end. By
> >> definition,
> >>>>> watermark is the time progress indication in the data stream. It
> >>> indicates
> >>>>> the stream’s event time has progressed to some specific time. On the
> >>> other
> >>>>> hand, timestamp in the records is usually used to generate the
> >>> watermark.
> >>>>> Therefore, it appears more appropriate and intuitive to calculate the
> >>> event
> >>>>> time lag by watermark and determine the backlog status. And by using
> >>> the
> >>>>> watermark, we can easily deal with the out-of-order and the idleness
> >>> of the
> >>>>> data.
> >>>>>
> >>>>> Please let me know if you have further questions.
> >>>>>
> >>>>> Best,
> >>>>> Xuannan
> >>>>> On Aug 10, 2023, 20:23 +0800, Xintong Song  <mailto:tonysong...@gmail.com>>,
> >>> wrote:
> >>>>>> Thanks for preparing the FLIP, Xuannan.
> >>>>>>
> >>>>>> +1 in general.
> >>>>>>
> >>>>>> A quick question, could you explain why we are relying on the
> >>> watermark
> >>>>> for
> >>>>>> emitting the record attribute? Why not use timestamps in the
> >>> records? I
> >>>>>> don't see any concern in using watermarks. Just wondering if
> >> there's
> >>> any
> >>>>>> deep considerations behind this.
> >>>>>>
> >>>>>> Best,
> >>>>>>
> >>>>>> Xintong
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> On Thu, Aug 3, 2023 at 3:03 PM Xuannan Su  <mailto:suxuanna...@gmail.com>>
> >>> wrote:
> >>>>>>
> >>>>>>> Hi all,
> >>>>>>>
> >>>>>>> I am opening this thread to discuss FLIP-328: Allow source
> >>> operators to
> >>>>>>> determine isProcessingBacklog based on watermark lag[1]. We had a
> >>>>> several
> >>>>>>> discussions with Dong Ling about the design, and thanks for all
> >> the
> >>>>>>> valuable advice.
> >>>>>>>
> >>>>>>> The FLIP aims to target the use-case where user want to run a
> >> Flink
> >>>>> job to
> >>>>>>> backfill historical data in a high throughput manner and continue
> >>>>>>> processing real-time data with low latency. Building upon the
> >>> backlog
> >>>>>>> concept introduced in FLIP-309[2], this proposal enables sources
> >> to
> >>>>> report
> >>>>>>> their status of processing backlog based on the watermark lag.
> >>>>>>>
> >>>>>>> We would greatly appreciate any comments or feedback you may have
> >>> on
> >>>>> this
> >>>>>>> proposal.
> >>>>>>>
> >>>>>>> Best,
> >>>>>>> Xuannan
> >>>>>>>
> >>>>>>>
> >>>>>>> [1]
> >>>>>>>
> >>>>>
> >>>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-328%3A+Allow+source+operators+to+determine+isProcessingBacklog+based+on+watermark+lag
> <
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-328%3A+Allow+source+operators+to+determine+isProcessingBacklog+based+on+watermark+lag
> >
> >>>>>>> [2]
> >>>>>>>
> >>>>>
> >>>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-309%3A+Support+using+larger+checkpointing+interval+when+source+is+processing+backlog
> <
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-309%3A+Support+using+larger+checkpointing+interval+when+source+is+processing+backlog
> >
> >>>>>>>
> >>>>>
> >>>
> >>
>
>


Re: [DISCUSS] FLIP-328: Allow source operators to determine isProcessingBacklog based on watermark lag

2023-08-21 Thread Xuannan Su
 
>>> While I agree that using a more general configuration key provides us
>> with
>>> the flexibility to switch to other approaches to calculate the lag in the
>>> future, the downside is that it may cause confusion for users. We
>> currently
>>> have fetchEventTimeLag, emitEventTimeLag, and watermarkLag in the source,
>>> and it is not clear which specific lag we are referring to. With the
>>> potential introduction of the Generalized Watermark mechanism in the
>>> future, if I understand correctly, a watermark won't necessarily need to
>> be
>>> a timestamp. I am concern that the general configuration key may not  be
>>> enough to cover all the use case and we will need to introduce a general
>>> way to determine the backlog status regardless.
>>> 
>>> For the reasons above, I prefer introducing the configuration as is, and
>>> change it later with the a deprecation process or migration process. What
>>> do you think?
>>> 
>>> Best,
>>> Xuannan
>>> On Aug 14, 2023, 14:09 +0800, Xintong Song >> <mailto:tonysong...@gmail.com>>,
>> wrote:
>>>> Thanks for the explanation.
>>>> 
>>>> I wonder if it makes sense to not expose this detail via the
>>> configuration
>>>> option. To be specific, I suggest not mentioning the "watermark"
>> keyword
>>> in
>>>> the configuration key and description.
>>>> 
>>>> - From the users' perspective, I think they only need to know there's a
>>>> lag higher than the given threshold, Flink will consider latency of
>>>> individual records as less important and prioritize throughput over it.
>>>> They don't really need the details of how the lags are calculated.
>>>> - For the internal implementation, I also think using watermark lags is
>>>> a good idea, for the reasons you've already mentioned. However, it's
>> not
>>>> the only possible option. Hiding this detail from users would give us
>> the
>>>> flexibility to switch to other approaches if needed in future.
>>>> - We are currently working on designing the ProcessFunction API
>>>> (consider it as a DataStream API V2). There's an idea to introduce a
>>>> Generalized Watermark mechanism, where basically the watermark can be
>>>> anything that needs to travel along the data-flow with certain
>> alignment
>>>> strategies, and event time watermark would be one specific case of it.
>>> This
>>>> is still an idea and has not been discussed and agreed on by the
>>> community,
>>>> and we are preparing a FLIP for it. But if we are going for it, the
>>> concept
>>>> "watermark-lag-threshold" could be ambiguous.
>>>> 
>>>> I do not intend to block the FLIP on this. I'd also be fine with
>>>> introducing the configuration as is, and changing it later, if needed,
>>> with
>>>> a regular deprecation and migration process. Just making my
>> suggestions.
>>>> 
>>>> 
>>>> Best,
>>>> 
>>>> Xintong
>>>> 
>>>> 
>>>> 
>>>> On Mon, Aug 14, 2023 at 12:00 PM Xuannan Su >>> <mailto:suxuanna...@gmail.com>>
>>> wrote:
>>>> 
>>>>> Hi Xintong,
>>>>> 
>>>>> Thanks for the reply.
>>>>> 
>>>>> I have considered using the timestamp in the records to determine the
>>>>> backlog status, and decided to use watermark at the end. By
>> definition,
>>>>> watermark is the time progress indication in the data stream. It
>>> indicates
>>>>> the stream’s event time has progressed to some specific time. On the
>>> other
>>>>> hand, timestamp in the records is usually used to generate the
>>> watermark.
>>>>> Therefore, it appears more appropriate and intuitive to calculate the
>>> event
>>>>> time lag by watermark and determine the backlog status. And by using
>>> the
>>>>> watermark, we can easily deal with the out-of-order and the idleness
>>> of the
>>>>> data.
>>>>> 
>>>>> Please let me know if you have further questions.
>>>>> 
>>>>> Best,
>>>>> Xuannan
>>>>> On Aug 10, 2023, 20:23 +0800, Xintong Song >>>> <mailto:tonysong...@gmail.com>>,
>>> 

Re: [DISCUSS] FLIP-328: Allow source operators to determine isProcessingBacklog based on watermark lag

2023-08-20 Thread Jark Wu
er, if needed,
> > with
> > > a regular deprecation and migration process. Just making my
> suggestions.
> > >
> > >
> > > Best,
> > >
> > > Xintong
> > >
> > >
> > >
> > > On Mon, Aug 14, 2023 at 12:00 PM Xuannan Su 
> > wrote:
> > >
> > > > Hi Xintong,
> > > >
> > > > Thanks for the reply.
> > > >
> > > > I have considered using the timestamp in the records to determine the
> > > > backlog status, and decided to use watermark at the end. By
> definition,
> > > > watermark is the time progress indication in the data stream. It
> > indicates
> > > > the stream’s event time has progressed to some specific time. On the
> > other
> > > > hand, timestamp in the records is usually used to generate the
> > watermark.
> > > > Therefore, it appears more appropriate and intuitive to calculate the
> > event
> > > > time lag by watermark and determine the backlog status. And by using
> > the
> > > > watermark, we can easily deal with the out-of-order and the idleness
> > of the
> > > > data.
> > > >
> > > > Please let me know if you have further questions.
> > > >
> > > > Best,
> > > > Xuannan
> > > > On Aug 10, 2023, 20:23 +0800, Xintong Song ,
> > wrote:
> > > > > Thanks for preparing the FLIP, Xuannan.
> > > > >
> > > > > +1 in general.
> > > > >
> > > > > A quick question, could you explain why we are relying on the
> > watermark
> > > > for
> > > > > emitting the record attribute? Why not use timestamps in the
> > records? I
> > > > > don't see any concern in using watermarks. Just wondering if
> there's
> > any
> > > > > deep considerations behind this.
> > > > >
> > > > > Best,
> > > > >
> > > > > Xintong
> > > > >
> > > > >
> > > > >
> > > > > On Thu, Aug 3, 2023 at 3:03 PM Xuannan Su 
> > wrote:
> > > > >
> > > > > > Hi all,
> > > > > >
> > > > > > I am opening this thread to discuss FLIP-328: Allow source
> > operators to
> > > > > > determine isProcessingBacklog based on watermark lag[1]. We had a
> > > > several
> > > > > > discussions with Dong Ling about the design, and thanks for all
> the
> > > > > > valuable advice.
> > > > > >
> > > > > > The FLIP aims to target the use-case where user want to run a
> Flink
> > > > job to
> > > > > > backfill historical data in a high throughput manner and continue
> > > > > > processing real-time data with low latency. Building upon the
> > backlog
> > > > > > concept introduced in FLIP-309[2], this proposal enables sources
> to
> > > > report
> > > > > > their status of processing backlog based on the watermark lag.
> > > > > >
> > > > > > We would greatly appreciate any comments or feedback you may have
> > on
> > > > this
> > > > > > proposal.
> > > > > >
> > > > > > Best,
> > > > > > Xuannan
> > > > > >
> > > > > >
> > > > > > [1]
> > > > > >
> > > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-328%3A+Allow+source+operators+to+determine+isProcessingBacklog+based+on+watermark+lag
> > > > > > [2]
> > > > > >
> > > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-309%3A+Support+using+larger+checkpointing+interval+when+source+is+processing+backlog
> > > > > >
> > > >
> >
>


Re: [DISCUSS] FLIP-328: Allow source operators to determine isProcessingBacklog based on watermark lag

2023-08-14 Thread Xintong Song
Sounds good to me.

It is true that, if we are introducing the generalized watermark, there
will be other watermark related concepts / configurations that need to be
updated anyway.


Best,

Xintong



On Tue, Aug 15, 2023 at 11:30 AM Xuannan Su  wrote:

> Hi Xingtong,
>
> Thank you for your suggestion.
>
> After considering the idea of using a general configuration key, I think
> it may not be a good idea for the reasons below.
>
> While I agree that using a more general configuration key provides us with
> the flexibility to switch to other approaches to calculate the lag in the
> future, the downside is that it may cause confusion for users. We currently
> have fetchEventTimeLag, emitEventTimeLag, and watermarkLag in the source,
> and it is not clear which specific lag we are referring to. With the
> potential introduction of the Generalized Watermark mechanism in the
> future, if I understand correctly, a watermark won't necessarily need to be
> a timestamp. I am concern that the general configuration key may not  be
> enough to cover all the use case and we will need to introduce a general
> way to determine the backlog status regardless.
>
> For the reasons above, I prefer introducing the configuration as is, and
> change it later with the a deprecation process or migration process. What
> do you think?
>
> Best,
> Xuannan
> On Aug 14, 2023, 14:09 +0800, Xintong Song , wrote:
> > Thanks for the explanation.
> >
> > I wonder if it makes sense to not expose this detail via the
> configuration
> > option. To be specific, I suggest not mentioning the "watermark" keyword
> in
> > the configuration key and description.
> >
> > - From the users' perspective, I think they only need to know there's a
> > lag higher than the given threshold, Flink will consider latency of
> > individual records as less important and prioritize throughput over it.
> > They don't really need the details of how the lags are calculated.
> > - For the internal implementation, I also think using watermark lags is
> > a good idea, for the reasons you've already mentioned. However, it's not
> > the only possible option. Hiding this detail from users would give us the
> > flexibility to switch to other approaches if needed in future.
> > - We are currently working on designing the ProcessFunction API
> > (consider it as a DataStream API V2). There's an idea to introduce a
> > Generalized Watermark mechanism, where basically the watermark can be
> > anything that needs to travel along the data-flow with certain alignment
> > strategies, and event time watermark would be one specific case of it.
> This
> > is still an idea and has not been discussed and agreed on by the
> community,
> > and we are preparing a FLIP for it. But if we are going for it, the
> concept
> > "watermark-lag-threshold" could be ambiguous.
> >
> > I do not intend to block the FLIP on this. I'd also be fine with
> > introducing the configuration as is, and changing it later, if needed,
> with
> > a regular deprecation and migration process. Just making my suggestions.
> >
> >
> > Best,
> >
> > Xintong
> >
> >
> >
> > On Mon, Aug 14, 2023 at 12:00 PM Xuannan Su 
> wrote:
> >
> > > Hi Xintong,
> > >
> > > Thanks for the reply.
> > >
> > > I have considered using the timestamp in the records to determine the
> > > backlog status, and decided to use watermark at the end. By definition,
> > > watermark is the time progress indication in the data stream. It
> indicates
> > > the stream’s event time has progressed to some specific time. On the
> other
> > > hand, timestamp in the records is usually used to generate the
> watermark.
> > > Therefore, it appears more appropriate and intuitive to calculate the
> event
> > > time lag by watermark and determine the backlog status. And by using
> the
> > > watermark, we can easily deal with the out-of-order and the idleness
> of the
> > > data.
> > >
> > > Please let me know if you have further questions.
> > >
> > > Best,
> > > Xuannan
> > > On Aug 10, 2023, 20:23 +0800, Xintong Song ,
> wrote:
> > > > Thanks for preparing the FLIP, Xuannan.
> > > >
> > > > +1 in general.
> > > >
> > > > A quick question, could you explain why we are relying on the
> watermark
> > > for
> > > > emitting the record attribute? Why not use timestamps in the
> records? I
> > > > don't see any concern in using watermarks. Just wondering

Re: [DISCUSS] FLIP-328: Allow source operators to determine isProcessingBacklog based on watermark lag

2023-08-14 Thread Xuannan Su
Hi Xingtong,

Thank you for your suggestion.

After considering the idea of using a general configuration key, I think it may 
not be a good idea for the reasons below.

While I agree that using a more general configuration key provides us with the 
flexibility to switch to other approaches to calculate the lag in the future, 
the downside is that it may cause confusion for users. We currently have 
fetchEventTimeLag, emitEventTimeLag, and watermarkLag in the source, and it is 
not clear which specific lag we are referring to. With the potential 
introduction of the Generalized Watermark mechanism in the future, if I 
understand correctly, a watermark won't necessarily need to be a timestamp. I 
am concern that the general configuration key may not  be enough to cover all 
the use case and we will need to introduce a general way to determine the 
backlog status regardless.

For the reasons above, I prefer introducing the configuration as is, and change 
it later with the a deprecation process or migration process. What do you think?

Best,
Xuannan
On Aug 14, 2023, 14:09 +0800, Xintong Song , wrote:
> Thanks for the explanation.
>
> I wonder if it makes sense to not expose this detail via the configuration
> option. To be specific, I suggest not mentioning the "watermark" keyword in
> the configuration key and description.
>
> - From the users' perspective, I think they only need to know there's a
> lag higher than the given threshold, Flink will consider latency of
> individual records as less important and prioritize throughput over it.
> They don't really need the details of how the lags are calculated.
> - For the internal implementation, I also think using watermark lags is
> a good idea, for the reasons you've already mentioned. However, it's not
> the only possible option. Hiding this detail from users would give us the
> flexibility to switch to other approaches if needed in future.
> - We are currently working on designing the ProcessFunction API
> (consider it as a DataStream API V2). There's an idea to introduce a
> Generalized Watermark mechanism, where basically the watermark can be
> anything that needs to travel along the data-flow with certain alignment
> strategies, and event time watermark would be one specific case of it. This
> is still an idea and has not been discussed and agreed on by the community,
> and we are preparing a FLIP for it. But if we are going for it, the concept
> "watermark-lag-threshold" could be ambiguous.
>
> I do not intend to block the FLIP on this. I'd also be fine with
> introducing the configuration as is, and changing it later, if needed, with
> a regular deprecation and migration process. Just making my suggestions.
>
>
> Best,
>
> Xintong
>
>
>
> On Mon, Aug 14, 2023 at 12:00 PM Xuannan Su  wrote:
>
> > Hi Xintong,
> >
> > Thanks for the reply.
> >
> > I have considered using the timestamp in the records to determine the
> > backlog status, and decided to use watermark at the end. By definition,
> > watermark is the time progress indication in the data stream. It indicates
> > the stream’s event time has progressed to some specific time. On the other
> > hand, timestamp in the records is usually used to generate the watermark.
> > Therefore, it appears more appropriate and intuitive to calculate the event
> > time lag by watermark and determine the backlog status. And by using the
> > watermark, we can easily deal with the out-of-order and the idleness of the
> > data.
> >
> > Please let me know if you have further questions.
> >
> > Best,
> > Xuannan
> > On Aug 10, 2023, 20:23 +0800, Xintong Song , wrote:
> > > Thanks for preparing the FLIP, Xuannan.
> > >
> > > +1 in general.
> > >
> > > A quick question, could you explain why we are relying on the watermark
> > for
> > > emitting the record attribute? Why not use timestamps in the records? I
> > > don't see any concern in using watermarks. Just wondering if there's any
> > > deep considerations behind this.
> > >
> > > Best,
> > >
> > > Xintong
> > >
> > >
> > >
> > > On Thu, Aug 3, 2023 at 3:03 PM Xuannan Su  wrote:
> > >
> > > > Hi all,
> > > >
> > > > I am opening this thread to discuss FLIP-328: Allow source operators to
> > > > determine isProcessingBacklog based on watermark lag[1]. We had a
> > several
> > > > discussions with Dong Ling about the design, and thanks for all the
> > > > valuable advice.
> > > >
> > > > The FLIP aims to target the use-case where user want to run a Flink
> > job

Re: [DISCUSS] FLIP-328: Allow source operators to determine isProcessingBacklog based on watermark lag

2023-08-14 Thread Xintong Song
Thanks for the explanation.

I wonder if it makes sense to not expose this detail via the configuration
option. To be specific, I suggest not mentioning the "watermark" keyword in
the configuration key and description.

   - From the users' perspective, I think they only need to know there's a
   lag higher than the given threshold, Flink will consider latency of
   individual records as less important and prioritize throughput over it.
   They don't really need the details of how the lags are calculated.
   - For the internal implementation, I also think using watermark lags is
   a good idea, for the reasons you've already mentioned. However, it's not
   the only possible option. Hiding this detail from users would give us the
   flexibility to switch to other approaches if needed in future.
   - We are currently working on designing the ProcessFunction API
   (consider it as a DataStream API V2). There's an idea to introduce a
   Generalized Watermark mechanism, where basically the watermark can be
   anything that needs to travel along the data-flow with certain alignment
   strategies, and event time watermark would be one specific case of it. This
   is still an idea and has not been discussed and agreed on by the community,
   and we are preparing a FLIP for it. But if we are going for it, the concept
   "watermark-lag-threshold" could be ambiguous.

I do not intend to block the FLIP on this. I'd also be fine with
introducing the configuration as is, and changing it later, if needed, with
a regular deprecation and migration process. Just making my suggestions.


Best,

Xintong



On Mon, Aug 14, 2023 at 12:00 PM Xuannan Su  wrote:

> Hi Xintong,
>
> Thanks for the reply.
>
> I have considered using the timestamp in the records to determine the
> backlog status, and decided to use watermark at the end. By definition,
> watermark is the time progress indication in the data stream. It indicates
> the stream’s event time has progressed to some specific time. On the other
> hand, timestamp in the records is usually used to generate the watermark.
> Therefore, it appears more appropriate and intuitive to calculate the event
> time lag by watermark and determine the backlog status. And by using the
> watermark, we can easily deal with the out-of-order and the idleness of the
> data.
>
> Please let me know if you have further questions.
>
> Best,
> Xuannan
> On Aug 10, 2023, 20:23 +0800, Xintong Song , wrote:
> > Thanks for preparing the FLIP, Xuannan.
> >
> > +1 in general.
> >
> > A quick question, could you explain why we are relying on the watermark
> for
> > emitting the record attribute? Why not use timestamps in the records? I
> > don't see any concern in using watermarks. Just wondering if there's any
> > deep considerations behind this.
> >
> > Best,
> >
> > Xintong
> >
> >
> >
> > On Thu, Aug 3, 2023 at 3:03 PM Xuannan Su  wrote:
> >
> > > Hi all,
> > >
> > > I am opening this thread to discuss FLIP-328: Allow source operators to
> > > determine isProcessingBacklog based on watermark lag[1]. We had a
> several
> > > discussions with Dong Ling about the design, and thanks for all the
> > > valuable advice.
> > >
> > > The FLIP aims to target the use-case where user want to run a Flink
> job to
> > > backfill historical data in a high throughput manner and continue
> > > processing real-time data with low latency. Building upon the backlog
> > > concept introduced in FLIP-309[2], this proposal enables sources to
> report
> > > their status of processing backlog based on the watermark lag.
> > >
> > > We would greatly appreciate any comments or feedback you may have on
> this
> > > proposal.
> > >
> > > Best,
> > > Xuannan
> > >
> > >
> > > [1]
> > >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-328%3A+Allow+source+operators+to+determine+isProcessingBacklog+based+on+watermark+lag
> > > [2]
> > >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-309%3A+Support+using+larger+checkpointing+interval+when+source+is+processing+backlog
> > >
>


Re: [DISCUSS] FLIP-328: Allow source operators to determine isProcessingBacklog based on watermark lag

2023-08-13 Thread Xuannan Su
Hi Xintong,

Thanks for the reply.

I have considered using the timestamp in the records to determine the backlog 
status, and decided to use watermark at the end. By definition, watermark is 
the time progress indication in the data stream. It indicates the stream’s 
event time has progressed to some specific time. On the other hand, timestamp 
in the records is usually used to generate the watermark. Therefore, it appears 
more appropriate and intuitive to calculate the event time lag by watermark and 
determine the backlog status. And by using the watermark, we can easily deal 
with the out-of-order and the idleness of the data.

Please let me know if you have further questions.

Best,
Xuannan
On Aug 10, 2023, 20:23 +0800, Xintong Song , wrote:
> Thanks for preparing the FLIP, Xuannan.
>
> +1 in general.
>
> A quick question, could you explain why we are relying on the watermark for
> emitting the record attribute? Why not use timestamps in the records? I
> don't see any concern in using watermarks. Just wondering if there's any
> deep considerations behind this.
>
> Best,
>
> Xintong
>
>
>
> On Thu, Aug 3, 2023 at 3:03 PM Xuannan Su  wrote:
>
> > Hi all,
> >
> > I am opening this thread to discuss FLIP-328: Allow source operators to
> > determine isProcessingBacklog based on watermark lag[1]. We had a several
> > discussions with Dong Ling about the design, and thanks for all the
> > valuable advice.
> >
> > The FLIP aims to target the use-case where user want to run a Flink job to
> > backfill historical data in a high throughput manner and continue
> > processing real-time data with low latency. Building upon the backlog
> > concept introduced in FLIP-309[2], this proposal enables sources to report
> > their status of processing backlog based on the watermark lag.
> >
> > We would greatly appreciate any comments or feedback you may have on this
> > proposal.
> >
> > Best,
> > Xuannan
> >
> >
> > [1]
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-328%3A+Allow+source+operators+to+determine+isProcessingBacklog+based+on+watermark+lag
> > [2]
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-309%3A+Support+using+larger+checkpointing+interval+when+source+is+processing+backlog
> >


Re: [DISCUSS] FLIP-328: Allow source operators to determine isProcessingBacklog based on watermark lag

2023-08-10 Thread Xintong Song
Thanks for preparing the FLIP, Xuannan.

+1 in general.

A quick question, could you explain why we are relying on the watermark for
emitting the record attribute? Why not use timestamps in the records? I
don't see any concern in using watermarks. Just wondering if there's any
deep considerations behind this.

Best,

Xintong



On Thu, Aug 3, 2023 at 3:03 PM Xuannan Su  wrote:

> Hi all,
>
> I am opening this thread to discuss FLIP-328: Allow source operators to
> determine isProcessingBacklog based on watermark lag[1]. We had a several
> discussions with Dong Ling about the design, and thanks for all the
> valuable advice.
>
> The FLIP aims to target the use-case where user want to run a Flink job to
> backfill historical data in a high throughput manner and continue
> processing real-time data with low latency. Building upon the backlog
> concept introduced in FLIP-309[2], this proposal enables sources to report
> their status of processing backlog based on the watermark lag.
>
> We would greatly appreciate any comments or feedback you may have on this
> proposal.
>
> Best,
> Xuannan
>
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-328%3A+Allow+source+operators+to+determine+isProcessingBacklog+based+on+watermark+lag
> [2]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-309%3A+Support+using+larger+checkpointing+interval+when+source+is+processing+backlog
>


[DISCUSS] FLIP-328: Allow source operators to determine isProcessingBacklog based on watermark lag

2023-08-03 Thread Xuannan Su
Hi all,

I am opening this thread to discuss FLIP-328: Allow source operators to 
determine isProcessingBacklog based on watermark lag[1]. We had a several 
discussions with Dong Ling about the design, and thanks for all the valuable 
advice.

The FLIP aims to target the use-case where user want to run a Flink job to 
backfill historical data in a high throughput manner and continue processing 
real-time data with low latency. Building upon the backlog concept introduced 
in FLIP-309[2], this proposal enables sources to report their status of 
processing backlog based on the watermark lag.

We would greatly appreciate any comments or feedback you may have on this 
proposal.

Best,
Xuannan


[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-328%3A+Allow+source+operators+to+determine+isProcessingBacklog+based+on+watermark+lag
[2] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-309%3A+Support+using+larger+checkpointing+interval+when+source+is+processing+backlog