Hi Jark,

Please see my comments inline.

On Mon, Sep 11, 2023 at 4:41 PM Jark Wu <imj...@gmail.com> wrote:

> Hi Dong,
>
> Please see my comments inline.
>
> >  As a result, the proposed job-level
> > config will be applied only in the changelog stage. So there is no
> > difference between these two approaches in this particular case, right?
>
> How the job-level config can be applied ONLY in the changelog stage?
>

Actually, the job-level config is *not* applicable only in the changelog
state. It only depends on the watermark lag of source operators. Therefore,
it is also applicable to the snapshot stage (or any other stage) as long as
watermark lag is well-defined in that stage.


> I think it is only possible if it is implemented by the CDC source itself,
> because the framework doesn't know which stage of the source is.
> Know that the CDC source may emit watermarks with a very small lag
> in the snapshot stage, and the job-level config may turn the backlog
> status into false.
>

Just to clarify, the job-level config introduced in FLIP-328 will *not* set
backlog to false even if CDC source emits watermark lag with very small lag
in the snapshot stage.

This is because the job-level config introduced in FLIP-328 is only
effective (i.e. set backlog to true) when watermark lag is high. Given that
CDC source itself has an extra rule that sets backlog to true in the
snapshot stage, that means the backlog will be true even if its watermark
lag is small.

Would this address your concern here?


>
> > On the other hand, per-source config will be necessary if users want to
> > apply different watermark lag thresholds for different sources in the
> same
> > job.
>
> We also have different watermark delay definitions for each source,
> I think this's also reasonable and necessary to have different watermark
> lags.
>

Hmm.. can you explain what you mean by "different watermark delay
definitions for each source"?

According to FLIP-33
<https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics>,
watermark lag is defined as "the time in milliseconds that the watermark
lags behind the wall clock time". This definition has the same semantics
and value type regardless of the specific source definition.

Do you mean to introduce another definition of watermark lag? If so, can
you explain the use-case and the definitions, so that we can better gauge
whether we should update FLIP-328 based on this idea?


>
> > Each source can have its own rule that specifies when the backlog can be
> true
> > (e.g. MySql CDC says the backlog should be true during the snapshot
> stage).
> > And we can have a job-level config that specifies when the backlog should
> > be true. Note that it is designed in such a way that none of these rules
> > specify when the backlog should be false. That is why there is no
> conflict
> > by definition.
>
> IIUC, FLIP-309 provides `setIsProcessingBacklog` to specify when the
> backlog
> is true and when is FALSE. This conflicts with the job-level config as it
>

I think there is probably misunderstanding here. FLIP-309 does NOT directly
specify when backlog is false. It is intentionally specified in such a way
that there will  not be any conflict between these rules.

More specifically, it is mentioned in the Java doc
of setIsProcessingBacklog that "If no API has been explicitly invoked to
specify the backlog status of a source, the source is considered to have
isProcessingBacklog=false by default".

The job-level config introduced in FLIP-328 is also considered an API. So
here is what it works following the definitions in FLIP-309:
if setIsProcessingBacklog(true) is explicilty invoked for a source, or if
the job-level config causes the backlog status to be set to true, then the
overall backlog status is set to true. Otherwise, it is set to false.

We can update the Java doc of setIsProcessingBacklog() to make this
definition/logic clearer.

Would this address your concern?


> will turn
> the status into true.
>
> > If I understand your comments correctly, you mean that we might have a
> > Flink SQL DDL with user-defined watermark expressions. And users also
> want
> > to set the backlog to true if the watermark generated by that
> > user-specified expression exceeds a threshold.
>
> No. I mean the source may not support generating watermarks, so the
> watermark
> expression is applied in a following operator (instead of in the source
> operator).
> This will result in the watermark lag doesn't work in this case and confuse
> users.
>

Can you provide a specific example / use-case that needs users to generate
a watermark in the following operator for a source that does not support
generating watermarks?

Depending on the concrete use-case, maybe the right solution is to update
the source to support generating watermark


> > You are right that this is a limitation. However, this is only a
> short-term
> > limitation which we added to make sure that we can focus on the
> capability
> > to switch from backlog=true to backlog=false. In the future, we will
> remove
> > this limitation and also support switching from backlog=false to
> > backlog=true.
>
> I can understand it may be difficult to support runtime mode switching back
> and forth.
> However, I think this should be a limitation of FLIP-327, not FLIP-328.
> IIUC,
> FLIP-309 doesn't have this limitation, right? I just don't understand
> what's the
> challenge to switch a flag?
>

Instead of thinking from a developer's perspective, it might be better to
compare these two approaches based on which approach is more user-friendly.

Note that there is no downside to introducing this limitation in FLIP-328.
This is because there is no use-case for switching the backlog from false
to true until Flink runtime can utilize this change to improve its
performance, which is the case in the near future.

Here are the advantages of introducing this limitation in FLIP-328 compared
to introducing this limitation in FLIP-327:

1) Flink users can know that Flink job will NOT run in backlog=true mode
once the job is runing in backlog=false mode. Otherwise, users may be
expecting the job to run with higher throughput when watermark lag goes up.

It might be useful to note that FLIP-327 does not introduce any API that is
used directly by end users. So it is not straightforward to specify this
limitation for end users in FLIP-327.

2) Developers of Flink operators who want to take advantage of the APIs
introduced in FLIP-327 (e.g. processRecordAttributes1) know that they don't
need to handle the case where backlog switches from true to false.

Although we can move the limitation from FLIP-328 to FLIP-327, it will just
become harder to explain to Flink developers that "source can switch
backlog from false to true but you don't need to handle this situation in
these APIs".

I hope the above reasoning can explain why it is simpler to introduce this
limitation in FLIP-328. Please be aware that this is just a short-term
limitation that aims to minimize the back-and-forth change of APIs while
making the semantics clear to end users.

Best,
Dong


> Best,
> Jark
>
>
> On Sun, 10 Sept 2023 at 19:44, Dong Lin <lindon...@gmail.com> wrote:
>
> > Hi Jark,
> >
> > Thanks for the comments. Please see my comments inline.
> >
> > On Sat, Sep 9, 2023 at 4:13 PM Jark Wu <imj...@gmail.com> wrote:
> >
> > > Hi Xuannan,
> > >
> > > I leave my comments inline.
> > >
> > > > In the case where a user wants to
> > > > use a CDC source and also determine backlog status based on watermark
> > > > lag, we still need to define the rule when that occurs
> > >
> > > This rule should be defined by the source itself (who knows backlog
> > best),
> > > not by the framework. In the case of CDC source, it reports
> > isBacklog=true
> > > during snapshot stage, and report isBacklog=false during changelog
> stage
> > if
> > > watermark-lag is within the threshold.
> > >
> >
> > I am not sure I fully understand the difference between adding a
> job-level
> > config vs. adding a per-source config.
> >
> > In the case of CDC, its watermark lag should be either unde-defined or
> > really large in the snapshot stage. As a result, the proposed job-level
> > config will be applied only in the changelog stage. So there is no
> > difference between these two approaches in this particular case, right?
> >
> > There are two advantages of the job-level config over per-source config:
> >
> > 1) Configuration is simpler. For example, suppose a user has a Flink job
> > that consumes records from multiple Kafka sources and wants to determine
> > backlog status for these Kafka sources using the same watermark lag
> > threshold, there is no need for users to repeatedly specify this
> threshold
> > for each source.
> >
> > 2) There is a smaller number of public APIs overall. In particular,
> instead
> > of repeatedly adding a setProcessingBacklogWatermarkLagThreshold() API
> for
> > every source operator that has even-time watermark lag defined, we only
> > need to add one job-level config. Less public API means better simplicity
> > and maintainability in general.
> >
> > On the other hand, per-source config will be necessary if users want to
> > apply different watermark lag thresholds for different sources in the
> same
> > job. Personally, I find this a bit counter-intuitive for users to specify
> > different watermark lag thresholds in the same job.
> >
> > Do you think there is any real-word use-case that requires this? Could
> you
> > provide a specific use-case where per-source config can provide an
> > advantage over the job-level config?
> >
> >
> > > I think it's not intuitive to combine it with the logical OR operation.
> > > Even for the
> > > combination logic of backlog status from different channels, FLIP-309
> > said
> > > it is
> > > "up to the operator to determine its output records' isBacklog value"
> and
> > > proposed
> > > 3 different strategies. Therefore, I think backlog status from a single
> > > source should
> > > be up to the source.
> >
> >
> > For both the job-level config and the per-source config, it is eventually
> > up to the user to decide the computation logic of the backlog status.
> > Whether this mechanism is implemented at the per-source level or
> framework
> > level is probably more like an implementation detail.
> >
> > Eventually, I think the choice between these two approaches depends on
> > whether we have any use-case for users to specify different watermark lag
> > thresholds in the same job.
> >
> >
> > >
> > > IMO, a better API design is not how to resolve conflicts but not
> > > introducing conflicts.
> >
> >
> > Just to clarify, the current FLIP does not introduce any conflict. Each
> > source can have its own rule that specifies when the backlog can be true
> > (e.g. MySql CDC says the backlog should be true during the snapshot
> stage).
> > And we can have a job-level config that specifies when the backlog should
> > be true. Note that it is designed in such a way that none of these rules
> > specify when the backlog should be false. That is why there is no
> conflict
> > by definition.
> >
> >
> >
> > Let the source determine backlog status removes the conflicts and I don't
> > > see big
> > > disadvantages.
> > >
> > > > It should not confuse the user that
> > > > DataStream#assignTimestampsAndWatermarks doesn't work with
> > > > backlog.watermark-lag-threshold, as it is not a source.
> > >
> > > Hmm, so this configuration may confuse Flink SQL users, because all
> > > watermarks
> > > are defined on the source DDL, but it may use a separate operator to
> emit
> > > watermarks
> > > if the source doesn't support emitting watermarks.
> > >
> >
> > If I understand your comments correctly, you mean that we might have a
> > Flink SQL DDL with user-defined watermark expressions. And users also
> want
> > to set the backlog to true if the watermark generated by that
> > user-specified expression exceeds a threshold.
> >
> > That is a good point and use-case. I agree we should also cover this
> > scenario. And we can update FLIP-328 to mention that the job-level config
> > will also be applicable when the watermark derived from the Flink SQL DDL
> > exceeds this threshold. Would this address your concern?
> >
> >
> > >
> > > > I think the description in the FLIP actually means the other way
> > > > around, where the job can never switch back to batch mode once it has
> > > > switched into streaming mode. This is to align with the current state
> > > > of FLIP-327[1], where only switching from batch to stream mode is
> > > > supported.
> > >
> > > This sounds like a limitation of FLIP-327 (that execution mode depends
> on
> > > backlog status).
> > > But the backlog status shouldn't have this limitation, because it is
> not
> > > only used for execution
> > > switching.
> > >
> >
> > You are right that this is a limitation. However, this is only a
> short-term
> > limitation which we added to make sure that we can focus on the
> capability
> > to switch from backlog=true to backlog=false. In the future, we will
> remove
> > this limitation and also support switching from backlog=false to
> > backlog=true.
> >
> > The capability to switch from backlog=true to backlog=false will
> mitigate a
> > lot of problems we are facing now. As it is common for users to start a
> > Flink job to process backlog data followed by real-time data. On the
> other
> > hand, switching from backlog=false to backlog=true is useful when there
> is
> > a traffic spike while the Flink job is processing real-time data, which
> is
> > also useful to address but less important than the previous one.
> >
> > Given that both features require considerable changes to the underlying
> > runtime, we think it might be useful and safe to tackle them one by one.
> >
> > Thanks again for the comments. Please let us know what you think.
> >
> > Best,
> > Dong
> >
> >
> > >
> > > Best,
> > > Jark
> > >
> > >
> > >
> > > On Fri, 8 Sept 2023 at 19:09, Xuannan Su <suxuanna...@gmail.com>
> wrote:
> > >
> > > > Hi Jark and Leonard,
> > > >
> > > > Thanks for the comments. Please see my reply below.
> > > >
> > > > @Jark
> > > >
> > > > > I think a better API doesn't compete with itself. Therefore, I'm in
> > > > favor of
> > > > > supporting the watermark lag threshold for each source without
> > > > introducing
> > > > > any framework API and configuration.
> > > >
> > > > I don't think supporting the watermark lag threshold for each source
> > > > can avoid the competition problem. In the case where a user wants to
> > > > use a CDC source and also determine backlog status based on watermark
> > > > lag, we still need to define the rule when that occurs. With that
> > > > said, I think it is more intuitive to combine it with the logical OR
> > > > operation, as the strategies (FLIP-309, FLIP-328) only determine when
> > > > the source's backlog status should be True. What do you think?
> > > >
> > > > > Besides, this can address another concern that the watermark may be
> > > > > generated by DataStream#assignTimestampsAnd
> > > > > Watermarks which doesn't
> > > > > work with the backlog.watermark-lag-threshold job config
> > > >
> > > > The description of the configuration explicitly states that "a source
> > > > would report isProcessingBacklog=true if its watermark lag exceeds
> the
> > > > configured value". It should not confuse the user that
> > > > DataStream#assignTimestampsAndWatermarks doesn't work with
> > > > backlog.watermark-lag-threshold, as it is not a source.
> > > >
> > > > > Does that mean the job can never back to streaming mode once
> switches
> > > > into
> > > > > backlog mode? It sounds like not a complete FLIP to me. Is it
> > possible
> > > to
> > > > > support switching back in this FLIP?
> > > >
> > > > I think the description in the FLIP actually means the other way
> > > > around, where the job can never switch back to batch mode once it has
> > > > switched into streaming mode. This is to align with the current state
> > > > of FLIP-327[1], where only switching from batch to stream mode is
> > > > supported.
> > > >
> > > > @Leonard
> > > >
> > > > > > The FLIP describe that: And it should report
> > > isProcessingBacklog=false
> > > > at the beginning of the snapshot stage.
> > > > > This should be “changelog stage”
> > > >
> > > > I think the description is in FLIP-309. Thanks for pointing out. I
> > > > updated the description.
> > > >
> > > > > I'm not sure if it's enough to support this feature only in FLIP-27
> > > > Source. Although we are pushing the sourceFunction API to be removed,
> > > these
> > > > APIs will be survive one or two versions in flink repo before they
> are
> > > > actually removed.
> > > >
> > > > I agree that it is good to support the SourceFunction API. However,
> > > > given that the SourceFunction API is marked as deprecated, I think I
> > > > will prioritize supporting the FLIP-27 Source. We can support the
> > > > SourceFunction API after the
> > > > FLIP-27 source. What do you think?
> > > >
> > > > Best regards,
> > > > Xuannan
> > > >
> > > > [1]
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-327%3A+Support+switching+from+batch+to+stream+mode+to+improve+throughput+when+processing+backlog+data
> > > >
> > > >
> > > >
> > > >
> > > > On Fri, Sep 8, 2023 at 1:02 AM Leonard Xu <xbjt...@gmail.com> wrote:
> > > > >
> > > > > Thanks Xuannan for driving this FLIP !
> > > > >
> > > > > The proposal generally looks good to me, but I still left some
> > > comments:
> > > > >
> > > > > > One more question about the FLIP is that the FLIP says "Note that
> > > this
> > > > > > config does not support switching source's isProcessingBacklog
> from
> > > > false to true
> > > > > > for now.” Does that mean the job can never back to streaming mode
> > > once
> > > > switches into
> > > > > > backlog mode? It sounds like not a complete FLIP to me. Is it
> > > possible
> > > > to
> > > > > > support switching back in this FLIP?
> > > > > +1 for Jark’s concern, IIUC, the state transition of
> > > IsProcessingBacklog
> > > > depends on whether the data in the source is processing backlog data
> or
> > > > not. Different sources will have different backlog status and which
> may
> > > > change over time. From a general perspective, we should not have this
> > > > restriction.
> > > > >
> > > > > > The FLIP describe that: And it should report
> > > isProcessingBacklog=false
> > > > at the beginning of the snapshot stage.
> > > > > This should be “changelog stage”
> > > > >
> > > > > I'm not sure if it's enough to support this feature only in FLIP-27
> > > > Source. Although we are pushing the sourceFunction API to be removed,
> > > these
> > > > APIs will be survive one or two versions in flink repo before they
> are
> > > > actually removed.
> > > > >
> > > > > Best,
> > > > > Leonard
> > > > >
> > > > > >
> > > > > > Best,
> > > > > > Jark
> > > > > >
> > > > > >
> > > > > > On Thu, 7 Sept 2023 at 13:51, Xuannan Su <suxuanna...@gmail.com>
> > > > wrote:
> > > > > >
> > > > > >> Hi all,
> > > > > >>
> > > > > >> Thank you for all the reviews and suggestions.
> > > > > >>
> > > > > >> I believe all the comments have been addressed. If there are no
> > > > > >> further comments, I plan to open the voting thread for this FLIP
> > > early
> > > > > >> next week.
> > > > > >>
> > > > > >> Best regards,
> > > > > >> Xuannan
> > > > > >>
> > > > > >> On Thu, Sep 7, 2023 at 12:09 AM Jing Ge
> > <j...@ververica.com.invalid
> > > >
> > > > > >> wrote:
> > > > > >>>
> > > > > >>> Hi Xuannan,
> > > > > >>>
> > > > > >>> I thought FLIP-328 will compete with FLIP-309 while setting the
> > > > value of
> > > > > >>> the backlog. Understood. Thanks for the hint.
> > > > > >>>
> > > > > >>> Best regards,
> > > > > >>> Jing
> > > > > >>>
> > > > > >>> On Wed, Sep 6, 2023 at 12:12 PM Xuannan Su <
> > suxuanna...@gmail.com>
> > > > > >> wrote:
> > > > > >>>
> > > > > >>>> Hi Jing,
> > > > > >>>>
> > > > > >>>> Thank you for the clarification.
> > > > > >>>>
> > > > > >>>> For the use case you mentioned, I believe we can utilize the
> > > > > >>>> HybridSource, as updated in FLIP-309[1], to determine the
> > backlog
> > > > > >>>> status. For example, if the user wants to process data before
> > > time T
> > > > > >>>> in batch mode and after time T in stream mode, they can set
> the
> > > > first
> > > > > >>>> source of the HybridSource to read up to time T and the last
> > > source
> > > > of
> > > > > >>>> the HybridSource to read from time T.
> > > > > >>>>
> > > > > >>>> Best,
> > > > > >>>> Xuannan
> > > > > >>>>
> > > > > >>>> [1]
> > > > > >>>>
> > > > > >>
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-309%3A+Support+using+larger+checkpointing+interval+when+source+is+processing+backlog
> > > > > >>>>
> > > > > >>>>
> > > > > >>>> On Mon, Sep 4, 2023 at 10:36 PM Jing Ge
> > > <j...@ververica.com.invalid
> > > > >
> > > > > >>>> wrote:
> > > > > >>>>>
> > > > > >>>>> Hi Xuannan,
> > > > > >>>>>
> > > > > >>>>> Thanks for the clarification.
> > > > > >>>>>
> > > > > >>>>> 3. Event time and process time are two different things. It
> > might
> > > > be
> > > > > >>>> rarely
> > > > > >>>>> used, but conceptually, users can process data in the past
> > > within a
> > > > > >>>>> specific time range in the streaming mode. All data before
> that
> > > > range
> > > > > >>>> will
> > > > > >>>>> be considered as backlog and needed to be processed in the
> > batch
> > > > > >> mode,
> > > > > >>>>> like, e.g. the Present Perfect Progressive tense used in
> > English
> > > > > >>>> language.
> > > > > >>>>>
> > > > > >>>>> Best regards,
> > > > > >>>>> Jing
> > > > > >>>>>
> > > > > >>>>> On Thu, Aug 31, 2023 at 4:45 AM Xuannan Su <
> > > suxuanna...@gmail.com>
> > > > > >>>> wrote:
> > > > > >>>>>
> > > > > >>>>>> Hi Jing,
> > > > > >>>>>>
> > > > > >>>>>> Thanks for the reply.
> > > > > >>>>>>
> > > > > >>>>>> 1. You are absolutely right that the watermark lag threshold
> > > must
> > > > > >> be
> > > > > >>>>>> carefully set with a thorough understanding of watermark
> > > > > >> generation.
> > > > > >>>> It is
> > > > > >>>>>> crucial for users to take into account the WatermarkStrategy
> > > when
> > > > > >>>> setting
> > > > > >>>>>> the watermark lag threshold.
> > > > > >>>>>>
> > > > > >>>>>> 2. Regarding pure processing-time based stream processing
> > jobs,
> > > > > >>>>>> alternative strategies will be implemented to determine
> > whether
> > > > the
> > > > > >>>> job is
> > > > > >>>>>> processing backlog data. I have outlined two possible
> > strategies
> > > > > >> below:
> > > > > >>>>>>
> > > > > >>>>>> - Based on the source operator's state. For example, when
> > MySQL
> > > > CDC
> > > > > >>>> source
> > > > > >>>>>> is reading snapshot, it can claim isBacklog=true.
> > > > > >>>>>> - Based on metrics. For example, when busyTimeMsPerSecond
> (or
> > > > > >>>>>> backPressuredTimeMsPerSecond) > user_specified_threshold,
> then
> > > > > >>>>>> isBacklog=true.
> > > > > >>>>>>
> > > > > >>>>>> As of the strategies proposed in this FLIP, it rely on
> > generated
> > > > > >>>>>> watermarks. Therefore, if a user intends for the job to
> detect
> > > > > >> backlog
> > > > > >>>>>> status based on watermark, it is necessary to generate the
> > > > > >> watermark.
> > > > > >>>>>>
> > > > > >>>>>> 3. I'm afraid I'm not fully grasping your question. From my
> > > > > >>>> understanding,
> > > > > >>>>>> it should work in both cases. When event times are close to
> > the
> > > > > >>>> processing
> > > > > >>>>>> time, resulting in watermarks close to the processing time,
> > the
> > > > > >> job is
> > > > > >>>> not
> > > > > >>>>>> processing backlog data. On the other hand, when event times
> > are
> > > > > >> far
> > > > > >>>> from
> > > > > >>>>>> processing time, causing watermarks to also be distant, if
> the
> > > lag
> > > > > >>>>>> surpasses the defined threshold, the job is considered
> > > processing
> > > > > >>>> backlog
> > > > > >>>>>> data.
> > > > > >>>>>>
> > > > > >>>>>> Best,
> > > > > >>>>>> Xuannan
> > > > > >>>>>>
> > > > > >>>>>>
> > > > > >>>>>>> On Aug 31, 2023, at 02:56, Jing Ge
> > <j...@ververica.com.INVALID
> > > >
> > > > > >>>> wrote:
> > > > > >>>>>>>
> > > > > >>>>>>> Hi Xuannan,
> > > > > >>>>>>>
> > > > > >>>>>>> Thanks for the clarification. That is the part where I am
> > > trying
> > > > > >> to
> > > > > >>>>>>> understand your thoughts. I have some follow-up questions:
> > > > > >>>>>>>
> > > > > >>>>>>> 1. It depends strongly on the watermarkStrategy and how
> > > > > >> customized
> > > > > >>>>>>> watermark generation looks like. It mixes business logic
> with
> > > > > >>>> technical
> > > > > >>>>>>> implementation and technical data processing mode. The
> value
> > of
> > > > > >> the
> > > > > >>>>>>> watermark lag threshold must be set very carefully. If the
> > > value
> > > > > >> is
> > > > > >>>> too
> > > > > >>>>>>> small. any time, when the watermark generation logic is
> > > > > >>>> changed(business
> > > > > >>>>>>> logic changes lead to the threshold getting exceeded), the
> > same
> > > > > >> job
> > > > > >>>> might
> > > > > >>>>>>> be running surprisingly in backlog processing mode, i.e. a
> > > > > >> butterfly
> > > > > >>>>>>> effect. A comprehensive documentation is required to avoid
> > any
> > > > > >>>> confusion
> > > > > >>>>>>> for the users.
> > > > > >>>>>>> 2. Like Jark already mentioned, use cases that do not have
> > > > > >>>> watermarks,
> > > > > >>>>>>> like pure processing-time based stream processing[1] are
> not
> > > > > >>>> covered. It
> > > > > >>>>>> is
> > > > > >>>>>>> more or less a trade-off solution that does not support
> such
> > > use
> > > > > >>>> cases
> > > > > >>>>>> and
> > > > > >>>>>>> appropriate documentation is required. Forcing them to
> > > explicitly
> > > > > >>>>>> generate
> > > > > >>>>>>> watermarks that are never needed just because of this does
> > not
> > > > > >> sound
> > > > > >>>>>> like a
> > > > > >>>>>>> proper solution.
> > > > > >>>>>>> 3. If I am not mistaken, it only works for use cases where
> > > event
> > > > > >>>> times
> > > > > >>>>>> are
> > > > > >>>>>>> very close to the processing times, because the wall clock
> is
> > > > > >> used to
> > > > > >>>>>>> calculate the watermark lag and the watermark is generated
> > > based
> > > > > >> on
> > > > > >>>> the
> > > > > >>>>>>> event time.
> > > > > >>>>>>>
> > > > > >>>>>>> Best regards,
> > > > > >>>>>>> Jing
> > > > > >>>>>>>
> > > > > >>>>>>> [1]
> > > > > >>>>>>>
> > > > > >>>>>>
> > > > > >>>>
> > > > > >>
> > > >
> > >
> >
> https://github.com/apache/flink/blob/2c50b4e956305426f478b726d4de4a640a16b810/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkStrategy.java#L236
> > > > > >>>>>>>
> > > > > >>>>>>> On Wed, Aug 30, 2023 at 4:06 AM Xuannan Su <
> > > > > >> suxuanna...@gmail.com>
> > > > > >>>>>> wrote:
> > > > > >>>>>>>
> > > > > >>>>>>>> Hi Jing,
> > > > > >>>>>>>>
> > > > > >>>>>>>> Thank you for the suggestion.
> > > > > >>>>>>>>
> > > > > >>>>>>>> The definition of watermark lag is the same as the
> > > watermarkLag
> > > > > >>>> metric
> > > > > >>>>>> in
> > > > > >>>>>>>> FLIP-33[1]. More specifically, the watermark lag
> calculation
> > > is
> > > > > >>>>>> computed at
> > > > > >>>>>>>> the time when a watermark is emitted downstream in the
> > > following
> > > > > >>>> way:
> > > > > >>>>>>>> watermarkLag = CurrentTime - Watermark. I have added this
> > > > > >>>> description to
> > > > > >>>>>>>> the FLIP.
> > > > > >>>>>>>>
> > > > > >>>>>>>> I hope this addresses your concern.
> > > > > >>>>>>>>
> > > > > >>>>>>>> Best,
> > > > > >>>>>>>> Xuannan
> > > > > >>>>>>>>
> > > > > >>>>>>>> [1]
> > > > > >>>>>>>>
> > > > > >>>>>>
> > > > > >>>>
> > > > > >>
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics
> > > > > >>>>>>>>
> > > > > >>>>>>>>
> > > > > >>>>>>>>> On Aug 28, 2023, at 01:04, Jing Ge
> > > <j...@ververica.com.INVALID
> > > > > >>>
> > > > > >>>> wrote:
> > > > > >>>>>>>>>
> > > > > >>>>>>>>> Hi Xuannan,
> > > > > >>>>>>>>>
> > > > > >>>>>>>>> Thanks for the proposal. +1 for me.
> > > > > >>>>>>>>>
> > > > > >>>>>>>>> There is one tiny thing that I am not sure if I
> understand
> > it
> > > > > >>>>>> correctly.
> > > > > >>>>>>>>> Since there will be many different WatermarkStrategies
> and
> > > > > >>>> different
> > > > > >>>>>>>>> WatermarkGenerators. Could you please update the FLIP and
> > add
> > > > > >> the
> > > > > >>>>>>>>> description of how the watermark lag is calculated
> exactly?
> > > > > >> E.g.
> > > > > >>>>>>>> Watermark
> > > > > >>>>>>>>> lag = A - B with A is the timestamp of the watermark
> > emitted
> > > > > >> to the
> > > > > >>>>>>>>> downstream and B is....(this is the part I am not really
> > sure
> > > > > >> after
> > > > > >>>>>>>> reading
> > > > > >>>>>>>>> the FLIP).
> > > > > >>>>>>>>>
> > > > > >>>>>>>>> Best regards,
> > > > > >>>>>>>>> Jing
> > > > > >>>>>>>>>
> > > > > >>>>>>>>>
> > > > > >>>>>>>>> On Mon, Aug 21, 2023 at 9:03 AM Xuannan Su <
> > > > > >> suxuanna...@gmail.com>
> > > > > >>>>>>>> wrote:
> > > > > >>>>>>>>>
> > > > > >>>>>>>>>> Hi Jark,
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>> Thanks for the comments.
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>> I agree that the current solution cannot support jobs
> that
> > > > > >> cannot
> > > > > >>>>>> define
> > > > > >>>>>>>>>> watermarks. However, after considering the
> > > > > >> pending-record-based
> > > > > >>>>>>>> solution, I
> > > > > >>>>>>>>>> believe the current solution is superior for the target
> > use
> > > > > >> case
> > > > > >>>> as it
> > > > > >>>>>>>> is
> > > > > >>>>>>>>>> more intuitive for users. The backlog status gives users
> > the
> > > > > >>>> ability
> > > > > >>>>>> to
> > > > > >>>>>>>>>> balance between throughput and latency. Making this
> > > trade-off
> > > > > >>>> decision
> > > > > >>>>>>>>>> based on the watermark lag is more intuitive from the
> > user's
> > > > > >>>>>>>> perspective.
> > > > > >>>>>>>>>> For instance, a user can decide that if the job lags
> > behind
> > > > > >> the
> > > > > >>>>>> current
> > > > > >>>>>>>>>> time by more than 1 hour, the result is not usable. In
> > that
> > > > > >> case,
> > > > > >>>> we
> > > > > >>>>>> can
> > > > > >>>>>>>>>> optimize for throughput when the data lags behind by
> more
> > > > > >> than an
> > > > > >>>>>> hour.
> > > > > >>>>>>>>>> With the pending-record-based solution, it's challenging
> > for
> > > > > >>>> users to
> > > > > >>>>>>>>>> determine when to optimize for throughput and when to
> > > > > >> prioritize
> > > > > >>>>>>>> latency.
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>> Regarding the limitations of the watermark-based
> solution:
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>> 1. The current solution can support jobs with sources
> that
> > > > > >> have
> > > > > >>>> event
> > > > > >>>>>>>>>> time. Users can always define a watermark at the source
> > > > > >> operator,
> > > > > >>>> even
> > > > > >>>>>>>> if
> > > > > >>>>>>>>>> it's not used by downstream operators, such as streaming
> > > join
> > > > > >> and
> > > > > >>>>>>>> unbounded
> > > > > >>>>>>>>>> aggregate.
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>> 2.I don't believe it's accurate to say that the
> watermark
> > > lag
> > > > > >> will
> > > > > >>>>>> keep
> > > > > >>>>>>>>>> increasing if no data is generated in Kafka. The
> watermark
> > > > > >> lag and
> > > > > >>>>>>>> backlog
> > > > > >>>>>>>>>> status are determined at the moment when the watermark
> is
> > > > > >> emitted
> > > > > >>>> to
> > > > > >>>>>> the
> > > > > >>>>>>>>>> downstream operator. If no data is emitted from the
> > source,
> > > > > >> the
> > > > > >>>>>>>> watermark
> > > > > >>>>>>>>>> lag and backlog status will not be updated. If the
> > > > > >>>> WatermarkStrategy
> > > > > >>>>>>>> with
> > > > > >>>>>>>>>> idleness is used, the source becomes non-backlog when it
> > > > > >> becomes
> > > > > >>>> idle.
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>> 3. I think watermark lag is more intuitive to determine
> > if a
> > > > > >> job
> > > > > >>>> is
> > > > > >>>>>>>>>> processing backlog data. Even when using pending
> records,
> > it
> > > > > >>>> faces a
> > > > > >>>>>>>>>> similar issue. For example, if the source has 1K pending
> > > > > >> records,
> > > > > >>>>>> those
> > > > > >>>>>>>>>> records can span from 1 day  to 1 hour to 1 second. If
> the
> > > > > >> records
> > > > > >>>>>> span
> > > > > >>>>>>>> 1
> > > > > >>>>>>>>>> day, it's probably best to optimize for throughput. If
> > they
> > > > > >> span 1
> > > > > >>>>>>>> hour, it
> > > > > >>>>>>>>>> depends on the business logic. If they span 1 second,
> > > > > >> optimizing
> > > > > >>>> for
> > > > > >>>>>>>>>> latency is likely the better choice.
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>> In summary, I believe the watermark-based solution is a
> > > > > >> superior
> > > > > >>>>>> choice
> > > > > >>>>>>>>>> for the target use case where watermark/event time can
> be
> > > > > >> defined.
> > > > > >>>>>>>>>> Additionally, I haven't come across a scenario that
> > requires
> > > > > >>>>>> low-latency
> > > > > >>>>>>>>>> processing and reads from a source that cannot define
> > > > > >> watermarks.
> > > > > >>>> If
> > > > > >>>>>> we
> > > > > >>>>>>>>>> encounter such a use case, we can create another FLIP to
> > > > > >> address
> > > > > >>>> those
> > > > > >>>>>>>>>> needs in the future. What do you think?
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>> Best,
> > > > > >>>>>>>>>> Xuannan
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>>> On Aug 20, 2023, at 23:27, Jark Wu <imj...@gmail.com
> > > > > >> <mailto:
> > > > > >>>>>>>>>> imj...@gmail.com>> wrote:
> > > > > >>>>>>>>>>>
> > > > > >>>>>>>>>>> Hi Xuannan,
> > > > > >>>>>>>>>>>
> > > > > >>>>>>>>>>> Thanks for opening this discussion.
> > > > > >>>>>>>>>>>
> > > > > >>>>>>>>>>> This current proposal may work in the mentioned
> watermark
> > > > > >> cases.
> > > > > >>>>>>>>>>> However, it seems this is not a general solution for
> > > sources
> > > > > >> to
> > > > > >>>>>>>> determine
> > > > > >>>>>>>>>>> "isProcessingBacklog".
> > > > > >>>>>>>>>>> From my point of view, there are 3 limitations of the
> > > current
> > > > > >>>>>> proposal:
> > > > > >>>>>>>>>>> 1. It doesn't cover jobs that don't have
> > > watermark/event-time
> > > > > >>>>>> defined,
> > > > > >>>>>>>>>>> for example streaming join and unbounded aggregate. We
> > may
> > > > > >> still
> > > > > >>>> need
> > > > > >>>>>>>> to
> > > > > >>>>>>>>>>> figure out solutions for them.
> > > > > >>>>>>>>>>> 2. Watermark lag can not be trusted, because it
> increases
> > > > > >>>> unlimited
> > > > > >>>>>> if
> > > > > >>>>>>>> no
> > > > > >>>>>>>>>>> data is generated in the Kafka.
> > > > > >>>>>>>>>>> But in this case, there is no backlog at all.
> > > > > >>>>>>>>>>> 3. Watermark lag is hard to reflect the amount of
> > backlog.
> > > > > >> If the
> > > > > >>>>>>>>>> watermark
> > > > > >>>>>>>>>>> lag is 1day or 1 hour or 1second,
> > > > > >>>>>>>>>>> there is possibly only 1 pending record there, which
> > means
> > > no
> > > > > >>>> backlog
> > > > > >>>>>>>> at
> > > > > >>>>>>>>>>> all.
> > > > > >>>>>>>>>>>
> > > > > >>>>>>>>>>> Therefore, IMO, watermark maybe not the ideal metric
> used
> > > to
> > > > > >>>>>> determine
> > > > > >>>>>>>>>>> "isProcessingBacklog".
> > > > > >>>>>>>>>>> What we need is something that reflects the number of
> > > records
> > > > > >>>>>>>> unprocessed
> > > > > >>>>>>>>>>> by the job.
> > > > > >>>>>>>>>>> Actually, that is the "pendingRecords" metric proposed
> in
> > > > > >>>> FLIP-33 and
> > > > > >>>>>>>> has
> > > > > >>>>>>>>>>> been implemented by Kafka source.
> > > > > >>>>>>>>>>> Did you consider using "pendingRecords" metric to
> > determine
> > > > > >>>>>>>>>>> "isProcessingBacklog"?
> > > > > >>>>>>>>>>>
> > > > > >>>>>>>>>>> Best,
> > > > > >>>>>>>>>>> Jark
> > > > > >>>>>>>>>>>
> > > > > >>>>>>>>>>>
> > > > > >>>>>>>>>>> [1]
> > > > > >>>>>>>>>>>
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>
> > > > > >>>>>>
> > > > > >>>>
> > > > > >>
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics
> > > > > >>>>>>>>>> <
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>
> > > > > >>>>>>
> > > > > >>>>
> > > > > >>
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics
> > > > > >>>>>>>>>>>
> > > > > >>>>>>>>>>>
> > > > > >>>>>>>>>>>
> > > > > >>>>>>>>>>>
> > > > > >>>>>>>>>>> On Tue, 15 Aug 2023 at 12:04, Xintong Song <
> > > > > >>>> tonysong...@gmail.com
> > > > > >>>>>>>>>> <mailto:tonysong...@gmail.com>> wrote:
> > > > > >>>>>>>>>>>
> > > > > >>>>>>>>>>>> Sounds good to me.
> > > > > >>>>>>>>>>>>
> > > > > >>>>>>>>>>>> It is true that, if we are introducing the generalized
> > > > > >>>> watermark,
> > > > > >>>>>>>> there
> > > > > >>>>>>>>>>>> will be other watermark related concepts /
> > configurations
> > > > > >> that
> > > > > >>>> need
> > > > > >>>>>> to
> > > > > >>>>>>>>>> be
> > > > > >>>>>>>>>>>> updated anyway.
> > > > > >>>>>>>>>>>>
> > > > > >>>>>>>>>>>>
> > > > > >>>>>>>>>>>> Best,
> > > > > >>>>>>>>>>>>
> > > > > >>>>>>>>>>>> Xintong
> > > > > >>>>>>>>>>>>
> > > > > >>>>>>>>>>>>
> > > > > >>>>>>>>>>>>
> > > > > >>>>>>>>>>>> On Tue, Aug 15, 2023 at 11:30 AM Xuannan Su <
> > > > > >>>> suxuanna...@gmail.com
> > > > > >>>>>>>>>> <mailto:suxuanna...@gmail.com>> wrote:
> > > > > >>>>>>>>>>>>
> > > > > >>>>>>>>>>>>> Hi Xingtong,
> > > > > >>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>> Thank you for your suggestion.
> > > > > >>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>> After considering the idea of using a general
> > > configuration
> > > > > >>>> key, I
> > > > > >>>>>>>>>> think
> > > > > >>>>>>>>>>>>> it may not be a good idea for the reasons below.
> > > > > >>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>> While I agree that using a more general configuration
> > key
> > > > > >>>> provides
> > > > > >>>>>> us
> > > > > >>>>>>>>>>>> with
> > > > > >>>>>>>>>>>>> the flexibility to switch to other approaches to
> > > calculate
> > > > > >> the
> > > > > >>>> lag
> > > > > >>>>>> in
> > > > > >>>>>>>>>> the
> > > > > >>>>>>>>>>>>> future, the downside is that it may cause confusion
> for
> > > > > >> users.
> > > > > >>>> We
> > > > > >>>>>>>>>>>> currently
> > > > > >>>>>>>>>>>>> have fetchEventTimeLag, emitEventTimeLag, and
> > > watermarkLag
> > > > > >> in
> > > > > >>>> the
> > > > > >>>>>>>>>> source,
> > > > > >>>>>>>>>>>>> and it is not clear which specific lag we are
> referring
> > > to.
> > > > > >>>> With
> > > > > >>>>>> the
> > > > > >>>>>>>>>>>>> potential introduction of the Generalized Watermark
> > > > > >> mechanism
> > > > > >>>> in
> > > > > >>>>>> the
> > > > > >>>>>>>>>>>>> future, if I understand correctly, a watermark won't
> > > > > >>>> necessarily
> > > > > >>>>>> need
> > > > > >>>>>>>>>> to
> > > > > >>>>>>>>>>>> be
> > > > > >>>>>>>>>>>>> a timestamp. I am concern that the general
> > configuration
> > > > > >> key
> > > > > >>>> may
> > > > > >>>>>> not
> > > > > >>>>>>>>>> be
> > > > > >>>>>>>>>>>>> enough to cover all the use case and we will need to
> > > > > >> introduce
> > > > > >>>> a
> > > > > >>>>>>>>>> general
> > > > > >>>>>>>>>>>>> way to determine the backlog status regardless.
> > > > > >>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>> For the reasons above, I prefer introducing the
> > > > > >> configuration
> > > > > >>>> as
> > > > > >>>>>> is,
> > > > > >>>>>>>>>> and
> > > > > >>>>>>>>>>>>> change it later with the a deprecation process or
> > > migration
> > > > > >>>>>> process.
> > > > > >>>>>>>>>> What
> > > > > >>>>>>>>>>>>> do you think?
> > > > > >>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>> Best,
> > > > > >>>>>>>>>>>>> Xuannan
> > > > > >>>>>>>>>>>>> On Aug 14, 2023, 14:09 +0800, Xintong Song <
> > > > > >>>> tonysong...@gmail.com
> > > > > >>>>>>>>>> <mailto:tonysong...@gmail.com>>,
> > > > > >>>>>>>>>>>> wrote:
> > > > > >>>>>>>>>>>>>> Thanks for the explanation.
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>> I wonder if it makes sense to not expose this detail
> > via
> > > > > >> the
> > > > > >>>>>>>>>>>>> configuration
> > > > > >>>>>>>>>>>>>> option. To be specific, I suggest not mentioning the
> > > > > >>>> "watermark"
> > > > > >>>>>>>>>>>> keyword
> > > > > >>>>>>>>>>>>> in
> > > > > >>>>>>>>>>>>>> the configuration key and description.
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>> - From the users' perspective, I think they only
> need
> > to
> > > > > >> know
> > > > > >>>>>>>> there's
> > > > > >>>>>>>>>> a
> > > > > >>>>>>>>>>>>>> lag higher than the given threshold, Flink will
> > consider
> > > > > >>>> latency
> > > > > >>>>>> of
> > > > > >>>>>>>>>>>>>> individual records as less important and prioritize
> > > > > >> throughput
> > > > > >>>>>> over
> > > > > >>>>>>>>>> it.
> > > > > >>>>>>>>>>>>>> They don't really need the details of how the lags
> are
> > > > > >>>> calculated.
> > > > > >>>>>>>>>>>>>> - For the internal implementation, I also think
> using
> > > > > >>>> watermark
> > > > > >>>>>> lags
> > > > > >>>>>>>>>> is
> > > > > >>>>>>>>>>>>>> a good idea, for the reasons you've already
> mentioned.
> > > > > >>>> However,
> > > > > >>>>>> it's
> > > > > >>>>>>>>>>>> not
> > > > > >>>>>>>>>>>>>> the only possible option. Hiding this detail from
> > users
> > > > > >> would
> > > > > >>>> give
> > > > > >>>>>>>> us
> > > > > >>>>>>>>>>>> the
> > > > > >>>>>>>>>>>>>> flexibility to switch to other approaches if needed
> in
> > > > > >> future.
> > > > > >>>>>>>>>>>>>> - We are currently working on designing the
> > > > > >> ProcessFunction
> > > > > >>>> API
> > > > > >>>>>>>>>>>>>> (consider it as a DataStream API V2). There's an
> idea
> > to
> > > > > >>>>>> introduce a
> > > > > >>>>>>>>>>>>>> Generalized Watermark mechanism, where basically the
> > > > > >>>> watermark can
> > > > > >>>>>>>> be
> > > > > >>>>>>>>>>>>>> anything that needs to travel along the data-flow
> with
> > > > > >> certain
> > > > > >>>>>>>>>>>> alignment
> > > > > >>>>>>>>>>>>>> strategies, and event time watermark would be one
> > > specific
> > > > > >>>> case of
> > > > > >>>>>>>> it.
> > > > > >>>>>>>>>>>>> This
> > > > > >>>>>>>>>>>>>> is still an idea and has not been discussed and
> agreed
> > > on
> > > > > >> by
> > > > > >>>> the
> > > > > >>>>>>>>>>>>> community,
> > > > > >>>>>>>>>>>>>> and we are preparing a FLIP for it. But if we are
> > going
> > > > > >> for
> > > > > >>>> it,
> > > > > >>>>>> the
> > > > > >>>>>>>>>>>>> concept
> > > > > >>>>>>>>>>>>>> "watermark-lag-threshold" could be ambiguous.
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>> I do not intend to block the FLIP on this. I'd also
> be
> > > > > >> fine
> > > > > >>>> with
> > > > > >>>>>>>>>>>>>> introducing the configuration as is, and changing it
> > > > > >> later, if
> > > > > >>>>>>>> needed,
> > > > > >>>>>>>>>>>>> with
> > > > > >>>>>>>>>>>>>> a regular deprecation and migration process. Just
> > making
> > > > > >> my
> > > > > >>>>>>>>>>>> suggestions.
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>> Best,
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>> Xintong
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>> On Mon, Aug 14, 2023 at 12:00 PM Xuannan Su <
> > > > > >>>>>> suxuanna...@gmail.com
> > > > > >>>>>>>>>> <mailto:suxuanna...@gmail.com>>
> > > > > >>>>>>>>>>>>> wrote:
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>> Hi Xintong,
> > > > > >>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>> Thanks for the reply.
> > > > > >>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>> I have considered using the timestamp in the
> records
> > to
> > > > > >>>> determine
> > > > > >>>>>>>> the
> > > > > >>>>>>>>>>>>>>> backlog status, and decided to use watermark at the
> > > end.
> > > > > >> By
> > > > > >>>>>>>>>>>> definition,
> > > > > >>>>>>>>>>>>>>> watermark is the time progress indication in the
> data
> > > > > >>>> stream. It
> > > > > >>>>>>>>>>>>> indicates
> > > > > >>>>>>>>>>>>>>> the stream’s event time has progressed to some
> > specific
> > > > > >>>> time. On
> > > > > >>>>>>>> the
> > > > > >>>>>>>>>>>>> other
> > > > > >>>>>>>>>>>>>>> hand, timestamp in the records is usually used to
> > > > > >> generate
> > > > > >>>> the
> > > > > >>>>>>>>>>>>> watermark.
> > > > > >>>>>>>>>>>>>>> Therefore, it appears more appropriate and
> intuitive
> > to
> > > > > >>>> calculate
> > > > > >>>>>>>> the
> > > > > >>>>>>>>>>>>> event
> > > > > >>>>>>>>>>>>>>> time lag by watermark and determine the backlog
> > status.
> > > > > >> And
> > > > > >>>> by
> > > > > >>>>>>>> using
> > > > > >>>>>>>>>>>>> the
> > > > > >>>>>>>>>>>>>>> watermark, we can easily deal with the out-of-order
> > and
> > > > > >> the
> > > > > >>>>>>>> idleness
> > > > > >>>>>>>>>>>>> of the
> > > > > >>>>>>>>>>>>>>> data.
> > > > > >>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>> Please let me know if you have further questions.
> > > > > >>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>> Best,
> > > > > >>>>>>>>>>>>>>> Xuannan
> > > > > >>>>>>>>>>>>>>> On Aug 10, 2023, 20:23 +0800, Xintong Song <
> > > > > >>>>>> tonysong...@gmail.com
> > > > > >>>>>>>>>> <mailto:tonysong...@gmail.com>>,
> > > > > >>>>>>>>>>>>> wrote:
> > > > > >>>>>>>>>>>>>>>> Thanks for preparing the FLIP, Xuannan.
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>> +1 in general.
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>> A quick question, could you explain why we are
> > relying
> > > > > >> on
> > > > > >>>> the
> > > > > >>>>>>>>>>>>> watermark
> > > > > >>>>>>>>>>>>>>> for
> > > > > >>>>>>>>>>>>>>>> emitting the record attribute? Why not use
> > timestamps
> > > > > >> in the
> > > > > >>>>>>>>>>>>> records? I
> > > > > >>>>>>>>>>>>>>>> don't see any concern in using watermarks. Just
> > > > > >> wondering if
> > > > > >>>>>>>>>>>> there's
> > > > > >>>>>>>>>>>>> any
> > > > > >>>>>>>>>>>>>>>> deep considerations behind this.
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>> Best,
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>> Xintong
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>> On Thu, Aug 3, 2023 at 3:03 PM Xuannan Su <
> > > > > >>>>>> suxuanna...@gmail.com
> > > > > >>>>>>>>>> <mailto:suxuanna...@gmail.com>>
> > > > > >>>>>>>>>>>>> wrote:
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>> Hi all,
> > > > > >>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>> I am opening this thread to discuss FLIP-328:
> Allow
> > > > > >> source
> > > > > >>>>>>>>>>>>> operators to
> > > > > >>>>>>>>>>>>>>>>> determine isProcessingBacklog based on watermark
> > > > > >> lag[1]. We
> > > > > >>>>>> had a
> > > > > >>>>>>>>>>>>>>> several
> > > > > >>>>>>>>>>>>>>>>> discussions with Dong Ling about the design, and
> > > thanks
> > > > > >>>> for all
> > > > > >>>>>>>>>>>> the
> > > > > >>>>>>>>>>>>>>>>> valuable advice.
> > > > > >>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>> The FLIP aims to target the use-case where user
> > want
> > > to
> > > > > >>>> run a
> > > > > >>>>>>>>>>>> Flink
> > > > > >>>>>>>>>>>>>>> job to
> > > > > >>>>>>>>>>>>>>>>> backfill historical data in a high throughput
> > manner
> > > > > >> and
> > > > > >>>>>> continue
> > > > > >>>>>>>>>>>>>>>>> processing real-time data with low latency.
> > Building
> > > > > >> upon
> > > > > >>>> the
> > > > > >>>>>>>>>>>>> backlog
> > > > > >>>>>>>>>>>>>>>>> concept introduced in FLIP-309[2], this proposal
> > > > > >> enables
> > > > > >>>>>> sources
> > > > > >>>>>>>>>>>> to
> > > > > >>>>>>>>>>>>>>> report
> > > > > >>>>>>>>>>>>>>>>> their status of processing backlog based on the
> > > > > >> watermark
> > > > > >>>> lag.
> > > > > >>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>> We would greatly appreciate any comments or
> > feedback
> > > > > >> you
> > > > > >>>> may
> > > > > >>>>>> have
> > > > > >>>>>>>>>>>>> on
> > > > > >>>>>>>>>>>>>>> this
> > > > > >>>>>>>>>>>>>>>>> proposal.
> > > > > >>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>> Best,
> > > > > >>>>>>>>>>>>>>>>> Xuannan
> > > > > >>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>> [1]
> > > > > >>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>
> > > > > >>>>>>
> > > > > >>>>
> > > > > >>
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-328%3A+Allow+source+operators+to+determine+isProcessingBacklog+based+on+watermark+lag
> > > > > >>>>>>>>>> <
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>
> > > > > >>>>>>
> > > > > >>>>
> > > > > >>
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-328%3A+Allow+source+operators+to+determine+isProcessingBacklog+based+on+watermark+lag
> > > > > >>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>> [2]
> > > > > >>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>
> > > > > >>>>>>
> > > > > >>>>
> > > > > >>
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-309%3A+Support+using+larger+checkpointing+interval+when+source+is+processing+backlog
> > > > > >>>>>>>>>> <
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>
> > > > > >>>>>>
> > > > > >>>>
> > > > > >>
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-309%3A+Support+using+larger+checkpointing+interval+when+source+is+processing+backlog
> > > > > >>>>>>>>
> > > > > >>>>>>>>
> > > > > >>>>>>>>
> > > > > >>>>>>
> > > > > >>>>>>
> > > > > >>>>
> > > > > >>
> > > > >
> > > >
> > >
> >
>

Reply via email to