Hi Dong,

> Note that we can not simply enforce the semantics of "any invocation of
> setIsProcessingBacklog(false) will set the job's backlog status to false".
> Suppose we have a job with two operators, where operatorA invokes
> setIsProcessingBacklog(false) and operatorB invokes
> setIsProcessingBacklog(true). There will be conflict if we use the
> semantics of "any invocation of setIsProcessingBacklog(false) will set the
> job's backlog status to false".

So it should set job's backlog status to false if the job has only a single
source,
right?

Could you elaborate on the behavior if there is a job with a single source,
and the watermark lag exceeds the configured value (should set backlog to
true?),
but the source invokes "setIsProcessingBacklog(false)"? Or the inverse one,
the source invokes "setIsProcessingBacklog(false)" first, but the watermark
lag
exceeds the configured value.

This is the conflict I'm concerned about.

Best,
Jark

On Fri, 15 Sept 2023 at 12:00, Dong Lin <lindon...@gmail.com> wrote:

> Hi Jark,
>
> Please see my comments inline.
>
> On Fri, Sep 15, 2023 at 10:35 AM Jark Wu <imj...@gmail.com> wrote:
>
> > Hi Dong,
> >
> > Please see my comments inline below.
>
>
> > > Hmm.. can you explain what you mean by "different watermark delay
> > > definitions for each source"?
> >
> > For example, "table1" defines a watermark with delay 5 seconds,
> > "table2" defines a watermark with delay 10 seconds. They have different
> > watermark delay definitions. So it is also reasonable they have different
> > watermark lag definitions, e.g., "table1" allows "10mins" and "table2"
> > allows "20mins".
> >
>
> I think the watermark delay you mentioned above is conceptually /
> fundamentally different from the watermark-lag-threshold proposed in this
> FLIP.
>
> It might be useful to revisit the semantics of these two concepts:
> - watermark delay is used to account for the maximum amount of orderliness
> that users expect (or willing to wait for) for records from a given source.
> - watermark-lag-threshold is used to define when processing latency is no
> longer important (e.g. because data is already stale).
>
> Even though users might expect different out of orderliness for different
> sources, users do not necessarily have different definitions / thresholds
> for when a record is considered "already stale".
>
>
> >
> > > I think there is probably misunderstanding here. FLIP-309 does NOT
> > directly
> > > specify when backlog is false. It is intentionally specified in such a
> > way
> > > that there will  not be any conflict between these rules.
> >
> > Do you mean FLIP-309 doesn't allow to specify backlog to be false?
> > Is this mentioned in FLIP-309? This is completely different from what I
> >
>
> Can you explain what you mean by "allow to specify backlog to be false"?
>
> If what you mean is that "can invoke setIsProcessingBacklog(false)", then
> FLIP-309 supports doing this.
>
> If what you mean is that "any invocation of setIsProcessingBacklog(false)
> will set the job's backlog status to false", then FLIP-309 does not support
> this. I believe the existing Java doc of this API and FLIP-309 is
> compatible with this explanation.
>
> Note that we can not simply enforce the semantics of "any invocation of
> setIsProcessingBacklog(false) will set the job's backlog status to false".
> Suppose we have a job with two operators, where operatorA invokes
> setIsProcessingBacklog(false) and operatorB invokes
> setIsProcessingBacklog(true). There will be conflict if we use the
> semantics of "any invocation of setIsProcessingBacklog(false) will set the
> job's backlog status to false".
>
> Would this answer your question?
>
> Best,
> Dong
>
>
> > understand. From the API interface "ctx.setIsProcessingBacklog(boolean)",
> > it allows users to invoke "setIsProcessingBacklog(false)". And FLIP-309
> > also says "MySQL CDC source should report isProcessingBacklog=false
> > at the beginning of the changelog stage." If not, maybe we need to
> revisit
> > FLIP-309.
>
>
> > Best,
> > Jark
> >
> >
> >
> > On Fri, 15 Sept 2023 at 08:41, Dong Lin <lindon...@gmail.com> wrote:
> >
> > > Hi Jark,
> > >
> > > Do you have any follow-up comment?
> > >
> > > My gut feeling is that suppose we need to support per-source watermark
> > lag
> > > specification in the future (not sure we have a use-case for this right
> > > now), we can add such a config in the future with a follow-up FLIP. The
> > > job-level config will still be useful as it makes users' configuration
> > > simpler for common scenarios.
> > >
> > > If it is OK, can we agree to make incremental progress for Flink and
> > start
> > > a voting thread for this FLIP?
> > >
> > > Thanks,
> > > Dong
> > >
> > >
> > > On Mon, Sep 11, 2023 at 4:41 PM Jark Wu <imj...@gmail.com> wrote:
> > >
> > > > Hi Dong,
> > > >
> > > > Please see my comments inline.
> > > >
> > > > >  As a result, the proposed job-level
> > > > > config will be applied only in the changelog stage. So there is no
> > > > > difference between these two approaches in this particular case,
> > right?
> > > >
> > > > How the job-level config can be applied ONLY in the changelog stage?
> > > > I think it is only possible if it is implemented by the CDC source
> > > itself,
> > > > because the framework doesn't know which stage of the source is.
> > > > Know that the CDC source may emit watermarks with a very small lag
> > > > in the snapshot stage, and the job-level config may turn the backlog
> > > > status into false.
> > > >
> > > > > On the other hand, per-source config will be necessary if users
> want
> > to
> > > > > apply different watermark lag thresholds for different sources in
> the
> > > > same
> > > > > job.
> > > >
> > > > We also have different watermark delay definitions for each source,
> > > > I think this's also reasonable and necessary to have different
> > watermark
> > > > lags.
> > > >
> > > >
> > > > > Each source can have its own rule that specifies when the backlog
> can
> > > be
> > > > true
> > > > > (e.g. MySql CDC says the backlog should be true during the snapshot
> > > > stage).
> > > > > And we can have a job-level config that specifies when the backlog
> > > should
> > > > > be true. Note that it is designed in such a way that none of these
> > > rules
> > > > > specify when the backlog should be false. That is why there is no
> > > > conflict
> > > > > by definition.
> > > >
> > > > IIUC, FLIP-309 provides `setIsProcessingBacklog` to specify when the
> > > > backlog
> > > > is true and when is FALSE. This conflicts with the job-level config
> as
> > it
> > > > will turn
> > > > the status into true.
> > > >
> > > > > If I understand your comments correctly, you mean that we might
> have
> > a
> > > > > Flink SQL DDL with user-defined watermark expressions. And users
> also
> > > > want
> > > > > to set the backlog to true if the watermark generated by that
> > > > > user-specified expression exceeds a threshold.
> > > >
> > > > No. I mean the source may not support generating watermarks, so the
> > > > watermark
> > > > expression is applied in a following operator (instead of in the
> source
> > > > operator).
> > > > This will result in the watermark lag doesn't work in this case and
> > > confuse
> > > > users.
> > > >
> > > > > You are right that this is a limitation. However, this is only a
> > > > short-term
> > > > > limitation which we added to make sure that we can focus on the
> > > > capability
> > > > > to switch from backlog=true to backlog=false. In the future, we
> will
> > > > remove
> > > > > this limitation and also support switching from backlog=false to
> > > > > backlog=true.
> > > >
> > > > I can understand it may be difficult to support runtime mode
> switching
> > > back
> > > > and forth.
> > > > However, I think this should be a limitation of FLIP-327, not
> FLIP-328.
> > > > IIUC,
> > > > FLIP-309 doesn't have this limitation, right? I just don't understand
> > > > what's the
> > > > challenge to switch a flag?
> > > >
> > > > Best,
> > > > Jark
> > > >
> > > >
> > > > On Sun, 10 Sept 2023 at 19:44, Dong Lin <lindon...@gmail.com> wrote:
> > > >
> > > > > Hi Jark,
> > > > >
> > > > > Thanks for the comments. Please see my comments inline.
> > > > >
> > > > > On Sat, Sep 9, 2023 at 4:13 PM Jark Wu <imj...@gmail.com> wrote:
> > > > >
> > > > > > Hi Xuannan,
> > > > > >
> > > > > > I leave my comments inline.
> > > > > >
> > > > > > > In the case where a user wants to
> > > > > > > use a CDC source and also determine backlog status based on
> > > watermark
> > > > > > > lag, we still need to define the rule when that occurs
> > > > > >
> > > > > > This rule should be defined by the source itself (who knows
> backlog
> > > > > best),
> > > > > > not by the framework. In the case of CDC source, it reports
> > > > > isBacklog=true
> > > > > > during snapshot stage, and report isBacklog=false during
> changelog
> > > > stage
> > > > > if
> > > > > > watermark-lag is within the threshold.
> > > > > >
> > > > >
> > > > > I am not sure I fully understand the difference between adding a
> > > > job-level
> > > > > config vs. adding a per-source config.
> > > > >
> > > > > In the case of CDC, its watermark lag should be either unde-defined
> > or
> > > > > really large in the snapshot stage. As a result, the proposed
> > job-level
> > > > > config will be applied only in the changelog stage. So there is no
> > > > > difference between these two approaches in this particular case,
> > right?
> > > > >
> > > > > There are two advantages of the job-level config over per-source
> > > config:
> > > > >
> > > > > 1) Configuration is simpler. For example, suppose a user has a
> Flink
> > > job
> > > > > that consumes records from multiple Kafka sources and wants to
> > > determine
> > > > > backlog status for these Kafka sources using the same watermark lag
> > > > > threshold, there is no need for users to repeatedly specify this
> > > > threshold
> > > > > for each source.
> > > > >
> > > > > 2) There is a smaller number of public APIs overall. In particular,
> > > > instead
> > > > > of repeatedly adding a setProcessingBacklogWatermarkLagThreshold()
> > API
> > > > for
> > > > > every source operator that has even-time watermark lag defined, we
> > only
> > > > > need to add one job-level config. Less public API means better
> > > simplicity
> > > > > and maintainability in general.
> > > > >
> > > > > On the other hand, per-source config will be necessary if users
> want
> > to
> > > > > apply different watermark lag thresholds for different sources in
> the
> > > > same
> > > > > job. Personally, I find this a bit counter-intuitive for users to
> > > specify
> > > > > different watermark lag thresholds in the same job.
> > > > >
> > > > > Do you think there is any real-word use-case that requires this?
> > Could
> > > > you
> > > > > provide a specific use-case where per-source config can provide an
> > > > > advantage over the job-level config?
> > > > >
> > > > >
> > > > > > I think it's not intuitive to combine it with the logical OR
> > > operation.
> > > > > > Even for the
> > > > > > combination logic of backlog status from different channels,
> > FLIP-309
> > > > > said
> > > > > > it is
> > > > > > "up to the operator to determine its output records' isBacklog
> > value"
> > > > and
> > > > > > proposed
> > > > > > 3 different strategies. Therefore, I think backlog status from a
> > > single
> > > > > > source should
> > > > > > be up to the source.
> > > > >
> > > > >
> > > > > For both the job-level config and the per-source config, it is
> > > eventually
> > > > > up to the user to decide the computation logic of the backlog
> status.
> > > > > Whether this mechanism is implemented at the per-source level or
> > > > framework
> > > > > level is probably more like an implementation detail.
> > > > >
> > > > > Eventually, I think the choice between these two approaches depends
> > on
> > > > > whether we have any use-case for users to specify different
> watermark
> > > lag
> > > > > thresholds in the same job.
> > > > >
> > > > >
> > > > > >
> > > > > > IMO, a better API design is not how to resolve conflicts but not
> > > > > > introducing conflicts.
> > > > >
> > > > >
> > > > > Just to clarify, the current FLIP does not introduce any conflict.
> > Each
> > > > > source can have its own rule that specifies when the backlog can be
> > > true
> > > > > (e.g. MySql CDC says the backlog should be true during the snapshot
> > > > stage).
> > > > > And we can have a job-level config that specifies when the backlog
> > > should
> > > > > be true. Note that it is designed in such a way that none of these
> > > rules
> > > > > specify when the backlog should be false. That is why there is no
> > > > conflict
> > > > > by definition.
> > > > >
> > > > >
> > > > >
> > > > > Let the source determine backlog status removes the conflicts and I
> > > don't
> > > > > > see big
> > > > > > disadvantages.
> > > > > >
> > > > > > > It should not confuse the user that
> > > > > > > DataStream#assignTimestampsAndWatermarks doesn't work with
> > > > > > > backlog.watermark-lag-threshold, as it is not a source.
> > > > > >
> > > > > > Hmm, so this configuration may confuse Flink SQL users, because
> all
> > > > > > watermarks
> > > > > > are defined on the source DDL, but it may use a separate operator
> > to
> > > > emit
> > > > > > watermarks
> > > > > > if the source doesn't support emitting watermarks.
> > > > > >
> > > > >
> > > > > If I understand your comments correctly, you mean that we might
> have
> > a
> > > > > Flink SQL DDL with user-defined watermark expressions. And users
> also
> > > > want
> > > > > to set the backlog to true if the watermark generated by that
> > > > > user-specified expression exceeds a threshold.
> > > > >
> > > > > That is a good point and use-case. I agree we should also cover
> this
> > > > > scenario. And we can update FLIP-328 to mention that the job-level
> > > config
> > > > > will also be applicable when the watermark derived from the Flink
> SQL
> > > DDL
> > > > > exceeds this threshold. Would this address your concern?
> > > > >
> > > > >
> > > > > >
> > > > > > > I think the description in the FLIP actually means the other
> way
> > > > > > > around, where the job can never switch back to batch mode once
> it
> > > has
> > > > > > > switched into streaming mode. This is to align with the current
> > > state
> > > > > > > of FLIP-327[1], where only switching from batch to stream mode
> is
> > > > > > > supported.
> > > > > >
> > > > > > This sounds like a limitation of FLIP-327 (that execution mode
> > > depends
> > > > on
> > > > > > backlog status).
> > > > > > But the backlog status shouldn't have this limitation, because it
> > is
> > > > not
> > > > > > only used for execution
> > > > > > switching.
> > > > > >
> > > > >
> > > > > You are right that this is a limitation. However, this is only a
> > > > short-term
> > > > > limitation which we added to make sure that we can focus on the
> > > > capability
> > > > > to switch from backlog=true to backlog=false. In the future, we
> will
> > > > remove
> > > > > this limitation and also support switching from backlog=false to
> > > > > backlog=true.
> > > > >
> > > > > The capability to switch from backlog=true to backlog=false will
> > > > mitigate a
> > > > > lot of problems we are facing now. As it is common for users to
> > start a
> > > > > Flink job to process backlog data followed by real-time data. On
> the
> > > > other
> > > > > hand, switching from backlog=false to backlog=true is useful when
> > there
> > > > is
> > > > > a traffic spike while the Flink job is processing real-time data,
> > which
> > > > is
> > > > > also useful to address but less important than the previous one.
> > > > >
> > > > > Given that both features require considerable changes to the
> > underlying
> > > > > runtime, we think it might be useful and safe to tackle them one by
> > > one.
> > > > >
> > > > > Thanks again for the comments. Please let us know what you think.
> > > > >
> > > > > Best,
> > > > > Dong
> > > > >
> > > > >
> > > > > >
> > > > > > Best,
> > > > > > Jark
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Fri, 8 Sept 2023 at 19:09, Xuannan Su <suxuanna...@gmail.com>
> > > > wrote:
> > > > > >
> > > > > > > Hi Jark and Leonard,
> > > > > > >
> > > > > > > Thanks for the comments. Please see my reply below.
> > > > > > >
> > > > > > > @Jark
> > > > > > >
> > > > > > > > I think a better API doesn't compete with itself. Therefore,
> > I'm
> > > in
> > > > > > > favor of
> > > > > > > > supporting the watermark lag threshold for each source
> without
> > > > > > > introducing
> > > > > > > > any framework API and configuration.
> > > > > > >
> > > > > > > I don't think supporting the watermark lag threshold for each
> > > source
> > > > > > > can avoid the competition problem. In the case where a user
> wants
> > > to
> > > > > > > use a CDC source and also determine backlog status based on
> > > watermark
> > > > > > > lag, we still need to define the rule when that occurs. With
> that
> > > > > > > said, I think it is more intuitive to combine it with the
> logical
> > > OR
> > > > > > > operation, as the strategies (FLIP-309, FLIP-328) only
> determine
> > > when
> > > > > > > the source's backlog status should be True. What do you think?
> > > > > > >
> > > > > > > > Besides, this can address another concern that the watermark
> > may
> > > be
> > > > > > > > generated by DataStream#assignTimestampsAnd
> > > > > > > > Watermarks which doesn't
> > > > > > > > work with the backlog.watermark-lag-threshold job config
> > > > > > >
> > > > > > > The description of the configuration explicitly states that "a
> > > source
> > > > > > > would report isProcessingBacklog=true if its watermark lag
> > exceeds
> > > > the
> > > > > > > configured value". It should not confuse the user that
> > > > > > > DataStream#assignTimestampsAndWatermarks doesn't work with
> > > > > > > backlog.watermark-lag-threshold, as it is not a source.
> > > > > > >
> > > > > > > > Does that mean the job can never back to streaming mode once
> > > > switches
> > > > > > > into
> > > > > > > > backlog mode? It sounds like not a complete FLIP to me. Is it
> > > > > possible
> > > > > > to
> > > > > > > > support switching back in this FLIP?
> > > > > > >
> > > > > > > I think the description in the FLIP actually means the other
> way
> > > > > > > around, where the job can never switch back to batch mode once
> it
> > > has
> > > > > > > switched into streaming mode. This is to align with the current
> > > state
> > > > > > > of FLIP-327[1], where only switching from batch to stream mode
> is
> > > > > > > supported.
> > > > > > >
> > > > > > > @Leonard
> > > > > > >
> > > > > > > > > The FLIP describe that: And it should report
> > > > > > isProcessingBacklog=false
> > > > > > > at the beginning of the snapshot stage.
> > > > > > > > This should be “changelog stage”
> > > > > > >
> > > > > > > I think the description is in FLIP-309. Thanks for pointing
> out.
> > I
> > > > > > > updated the description.
> > > > > > >
> > > > > > > > I'm not sure if it's enough to support this feature only in
> > > FLIP-27
> > > > > > > Source. Although we are pushing the sourceFunction API to be
> > > removed,
> > > > > > these
> > > > > > > APIs will be survive one or two versions in flink repo before
> > they
> > > > are
> > > > > > > actually removed.
> > > > > > >
> > > > > > > I agree that it is good to support the SourceFunction API.
> > However,
> > > > > > > given that the SourceFunction API is marked as deprecated, I
> > think
> > > I
> > > > > > > will prioritize supporting the FLIP-27 Source. We can support
> the
> > > > > > > SourceFunction API after the
> > > > > > > FLIP-27 source. What do you think?
> > > > > > >
> > > > > > > Best regards,
> > > > > > > Xuannan
> > > > > > >
> > > > > > > [1]
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-327%3A+Support+switching+from+batch+to+stream+mode+to+improve+throughput+when+processing+backlog+data
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > On Fri, Sep 8, 2023 at 1:02 AM Leonard Xu <xbjt...@gmail.com>
> > > wrote:
> > > > > > > >
> > > > > > > > Thanks Xuannan for driving this FLIP !
> > > > > > > >
> > > > > > > > The proposal generally looks good to me, but I still left
> some
> > > > > > comments:
> > > > > > > >
> > > > > > > > > One more question about the FLIP is that the FLIP says
> "Note
> > > that
> > > > > > this
> > > > > > > > > config does not support switching source's
> > isProcessingBacklog
> > > > from
> > > > > > > false to true
> > > > > > > > > for now.” Does that mean the job can never back to
> streaming
> > > mode
> > > > > > once
> > > > > > > switches into
> > > > > > > > > backlog mode? It sounds like not a complete FLIP to me. Is
> it
> > > > > > possible
> > > > > > > to
> > > > > > > > > support switching back in this FLIP?
> > > > > > > > +1 for Jark’s concern, IIUC, the state transition of
> > > > > > IsProcessingBacklog
> > > > > > > depends on whether the data in the source is processing backlog
> > > data
> > > > or
> > > > > > > not. Different sources will have different backlog status and
> > which
> > > > may
> > > > > > > change over time. From a general perspective, we should not
> have
> > > this
> > > > > > > restriction.
> > > > > > > >
> > > > > > > > > The FLIP describe that: And it should report
> > > > > > isProcessingBacklog=false
> > > > > > > at the beginning of the snapshot stage.
> > > > > > > > This should be “changelog stage”
> > > > > > > >
> > > > > > > > I'm not sure if it's enough to support this feature only in
> > > FLIP-27
> > > > > > > Source. Although we are pushing the sourceFunction API to be
> > > removed,
> > > > > > these
> > > > > > > APIs will be survive one or two versions in flink repo before
> > they
> > > > are
> > > > > > > actually removed.
> > > > > > > >
> > > > > > > > Best,
> > > > > > > > Leonard
> > > > > > > >
> > > > > > > > >
> > > > > > > > > Best,
> > > > > > > > > Jark
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > On Thu, 7 Sept 2023 at 13:51, Xuannan Su <
> > > suxuanna...@gmail.com>
> > > > > > > wrote:
> > > > > > > > >
> > > > > > > > >> Hi all,
> > > > > > > > >>
> > > > > > > > >> Thank you for all the reviews and suggestions.
> > > > > > > > >>
> > > > > > > > >> I believe all the comments have been addressed. If there
> are
> > > no
> > > > > > > > >> further comments, I plan to open the voting thread for
> this
> > > FLIP
> > > > > > early
> > > > > > > > >> next week.
> > > > > > > > >>
> > > > > > > > >> Best regards,
> > > > > > > > >> Xuannan
> > > > > > > > >>
> > > > > > > > >> On Thu, Sep 7, 2023 at 12:09 AM Jing Ge
> > > > > <j...@ververica.com.invalid
> > > > > > >
> > > > > > > > >> wrote:
> > > > > > > > >>>
> > > > > > > > >>> Hi Xuannan,
> > > > > > > > >>>
> > > > > > > > >>> I thought FLIP-328 will compete with FLIP-309 while
> setting
> > > the
> > > > > > > value of
> > > > > > > > >>> the backlog. Understood. Thanks for the hint.
> > > > > > > > >>>
> > > > > > > > >>> Best regards,
> > > > > > > > >>> Jing
> > > > > > > > >>>
> > > > > > > > >>> On Wed, Sep 6, 2023 at 12:12 PM Xuannan Su <
> > > > > suxuanna...@gmail.com>
> > > > > > > > >> wrote:
> > > > > > > > >>>
> > > > > > > > >>>> Hi Jing,
> > > > > > > > >>>>
> > > > > > > > >>>> Thank you for the clarification.
> > > > > > > > >>>>
> > > > > > > > >>>> For the use case you mentioned, I believe we can utilize
> > the
> > > > > > > > >>>> HybridSource, as updated in FLIP-309[1], to determine
> the
> > > > > backlog
> > > > > > > > >>>> status. For example, if the user wants to process data
> > > before
> > > > > > time T
> > > > > > > > >>>> in batch mode and after time T in stream mode, they can
> > set
> > > > the
> > > > > > > first
> > > > > > > > >>>> source of the HybridSource to read up to time T and the
> > last
> > > > > > source
> > > > > > > of
> > > > > > > > >>>> the HybridSource to read from time T.
> > > > > > > > >>>>
> > > > > > > > >>>> Best,
> > > > > > > > >>>> Xuannan
> > > > > > > > >>>>
> > > > > > > > >>>> [1]
> > > > > > > > >>>>
> > > > > > > > >>
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-309%3A+Support+using+larger+checkpointing+interval+when+source+is+processing+backlog
> > > > > > > > >>>>
> > > > > > > > >>>>
> > > > > > > > >>>> On Mon, Sep 4, 2023 at 10:36 PM Jing Ge
> > > > > > <j...@ververica.com.invalid
> > > > > > > >
> > > > > > > > >>>> wrote:
> > > > > > > > >>>>>
> > > > > > > > >>>>> Hi Xuannan,
> > > > > > > > >>>>>
> > > > > > > > >>>>> Thanks for the clarification.
> > > > > > > > >>>>>
> > > > > > > > >>>>> 3. Event time and process time are two different
> things.
> > It
> > > > > might
> > > > > > > be
> > > > > > > > >>>> rarely
> > > > > > > > >>>>> used, but conceptually, users can process data in the
> > past
> > > > > > within a
> > > > > > > > >>>>> specific time range in the streaming mode. All data
> > before
> > > > that
> > > > > > > range
> > > > > > > > >>>> will
> > > > > > > > >>>>> be considered as backlog and needed to be processed in
> > the
> > > > > batch
> > > > > > > > >> mode,
> > > > > > > > >>>>> like, e.g. the Present Perfect Progressive tense used
> in
> > > > > English
> > > > > > > > >>>> language.
> > > > > > > > >>>>>
> > > > > > > > >>>>> Best regards,
> > > > > > > > >>>>> Jing
> > > > > > > > >>>>>
> > > > > > > > >>>>> On Thu, Aug 31, 2023 at 4:45 AM Xuannan Su <
> > > > > > suxuanna...@gmail.com>
> > > > > > > > >>>> wrote:
> > > > > > > > >>>>>
> > > > > > > > >>>>>> Hi Jing,
> > > > > > > > >>>>>>
> > > > > > > > >>>>>> Thanks for the reply.
> > > > > > > > >>>>>>
> > > > > > > > >>>>>> 1. You are absolutely right that the watermark lag
> > > threshold
> > > > > > must
> > > > > > > > >> be
> > > > > > > > >>>>>> carefully set with a thorough understanding of
> watermark
> > > > > > > > >> generation.
> > > > > > > > >>>> It is
> > > > > > > > >>>>>> crucial for users to take into account the
> > > WatermarkStrategy
> > > > > > when
> > > > > > > > >>>> setting
> > > > > > > > >>>>>> the watermark lag threshold.
> > > > > > > > >>>>>>
> > > > > > > > >>>>>> 2. Regarding pure processing-time based stream
> > processing
> > > > > jobs,
> > > > > > > > >>>>>> alternative strategies will be implemented to
> determine
> > > > > whether
> > > > > > > the
> > > > > > > > >>>> job is
> > > > > > > > >>>>>> processing backlog data. I have outlined two possible
> > > > > strategies
> > > > > > > > >> below:
> > > > > > > > >>>>>>
> > > > > > > > >>>>>> - Based on the source operator's state. For example,
> > when
> > > > > MySQL
> > > > > > > CDC
> > > > > > > > >>>> source
> > > > > > > > >>>>>> is reading snapshot, it can claim isBacklog=true.
> > > > > > > > >>>>>> - Based on metrics. For example, when
> > busyTimeMsPerSecond
> > > > (or
> > > > > > > > >>>>>> backPressuredTimeMsPerSecond) >
> > user_specified_threshold,
> > > > then
> > > > > > > > >>>>>> isBacklog=true.
> > > > > > > > >>>>>>
> > > > > > > > >>>>>> As of the strategies proposed in this FLIP, it rely on
> > > > > generated
> > > > > > > > >>>>>> watermarks. Therefore, if a user intends for the job
> to
> > > > detect
> > > > > > > > >> backlog
> > > > > > > > >>>>>> status based on watermark, it is necessary to generate
> > the
> > > > > > > > >> watermark.
> > > > > > > > >>>>>>
> > > > > > > > >>>>>> 3. I'm afraid I'm not fully grasping your question.
> From
> > > my
> > > > > > > > >>>> understanding,
> > > > > > > > >>>>>> it should work in both cases. When event times are
> close
> > > to
> > > > > the
> > > > > > > > >>>> processing
> > > > > > > > >>>>>> time, resulting in watermarks close to the processing
> > > time,
> > > > > the
> > > > > > > > >> job is
> > > > > > > > >>>> not
> > > > > > > > >>>>>> processing backlog data. On the other hand, when event
> > > times
> > > > > are
> > > > > > > > >> far
> > > > > > > > >>>> from
> > > > > > > > >>>>>> processing time, causing watermarks to also be
> distant,
> > if
> > > > the
> > > > > > lag
> > > > > > > > >>>>>> surpasses the defined threshold, the job is considered
> > > > > > processing
> > > > > > > > >>>> backlog
> > > > > > > > >>>>>> data.
> > > > > > > > >>>>>>
> > > > > > > > >>>>>> Best,
> > > > > > > > >>>>>> Xuannan
> > > > > > > > >>>>>>
> > > > > > > > >>>>>>
> > > > > > > > >>>>>>> On Aug 31, 2023, at 02:56, Jing Ge
> > > > > <j...@ververica.com.INVALID
> > > > > > >
> > > > > > > > >>>> wrote:
> > > > > > > > >>>>>>>
> > > > > > > > >>>>>>> Hi Xuannan,
> > > > > > > > >>>>>>>
> > > > > > > > >>>>>>> Thanks for the clarification. That is the part where
> I
> > am
> > > > > > trying
> > > > > > > > >> to
> > > > > > > > >>>>>>> understand your thoughts. I have some follow-up
> > > questions:
> > > > > > > > >>>>>>>
> > > > > > > > >>>>>>> 1. It depends strongly on the watermarkStrategy and
> how
> > > > > > > > >> customized
> > > > > > > > >>>>>>> watermark generation looks like. It mixes business
> > logic
> > > > with
> > > > > > > > >>>> technical
> > > > > > > > >>>>>>> implementation and technical data processing mode.
> The
> > > > value
> > > > > of
> > > > > > > > >> the
> > > > > > > > >>>>>>> watermark lag threshold must be set very carefully.
> If
> > > the
> > > > > > value
> > > > > > > > >> is
> > > > > > > > >>>> too
> > > > > > > > >>>>>>> small. any time, when the watermark generation logic
> is
> > > > > > > > >>>> changed(business
> > > > > > > > >>>>>>> logic changes lead to the threshold getting
> exceeded),
> > > the
> > > > > same
> > > > > > > > >> job
> > > > > > > > >>>> might
> > > > > > > > >>>>>>> be running surprisingly in backlog processing mode,
> > i.e.
> > > a
> > > > > > > > >> butterfly
> > > > > > > > >>>>>>> effect. A comprehensive documentation is required to
> > > avoid
> > > > > any
> > > > > > > > >>>> confusion
> > > > > > > > >>>>>>> for the users.
> > > > > > > > >>>>>>> 2. Like Jark already mentioned, use cases that do not
> > > have
> > > > > > > > >>>> watermarks,
> > > > > > > > >>>>>>> like pure processing-time based stream processing[1]
> > are
> > > > not
> > > > > > > > >>>> covered. It
> > > > > > > > >>>>>> is
> > > > > > > > >>>>>>> more or less a trade-off solution that does not
> support
> > > > such
> > > > > > use
> > > > > > > > >>>> cases
> > > > > > > > >>>>>> and
> > > > > > > > >>>>>>> appropriate documentation is required. Forcing them
> to
> > > > > > explicitly
> > > > > > > > >>>>>> generate
> > > > > > > > >>>>>>> watermarks that are never needed just because of this
> > > does
> > > > > not
> > > > > > > > >> sound
> > > > > > > > >>>>>> like a
> > > > > > > > >>>>>>> proper solution.
> > > > > > > > >>>>>>> 3. If I am not mistaken, it only works for use cases
> > > where
> > > > > > event
> > > > > > > > >>>> times
> > > > > > > > >>>>>> are
> > > > > > > > >>>>>>> very close to the processing times, because the wall
> > > clock
> > > > is
> > > > > > > > >> used to
> > > > > > > > >>>>>>> calculate the watermark lag and the watermark is
> > > generated
> > > > > > based
> > > > > > > > >> on
> > > > > > > > >>>> the
> > > > > > > > >>>>>>> event time.
> > > > > > > > >>>>>>>
> > > > > > > > >>>>>>> Best regards,
> > > > > > > > >>>>>>> Jing
> > > > > > > > >>>>>>>
> > > > > > > > >>>>>>> [1]
> > > > > > > > >>>>>>>
> > > > > > > > >>>>>>
> > > > > > > > >>>>
> > > > > > > > >>
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/apache/flink/blob/2c50b4e956305426f478b726d4de4a640a16b810/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkStrategy.java#L236
> > > > > > > > >>>>>>>
> > > > > > > > >>>>>>> On Wed, Aug 30, 2023 at 4:06 AM Xuannan Su <
> > > > > > > > >> suxuanna...@gmail.com>
> > > > > > > > >>>>>> wrote:
> > > > > > > > >>>>>>>
> > > > > > > > >>>>>>>> Hi Jing,
> > > > > > > > >>>>>>>>
> > > > > > > > >>>>>>>> Thank you for the suggestion.
> > > > > > > > >>>>>>>>
> > > > > > > > >>>>>>>> The definition of watermark lag is the same as the
> > > > > > watermarkLag
> > > > > > > > >>>> metric
> > > > > > > > >>>>>> in
> > > > > > > > >>>>>>>> FLIP-33[1]. More specifically, the watermark lag
> > > > calculation
> > > > > > is
> > > > > > > > >>>>>> computed at
> > > > > > > > >>>>>>>> the time when a watermark is emitted downstream in
> the
> > > > > > following
> > > > > > > > >>>> way:
> > > > > > > > >>>>>>>> watermarkLag = CurrentTime - Watermark. I have added
> > > this
> > > > > > > > >>>> description to
> > > > > > > > >>>>>>>> the FLIP.
> > > > > > > > >>>>>>>>
> > > > > > > > >>>>>>>> I hope this addresses your concern.
> > > > > > > > >>>>>>>>
> > > > > > > > >>>>>>>> Best,
> > > > > > > > >>>>>>>> Xuannan
> > > > > > > > >>>>>>>>
> > > > > > > > >>>>>>>> [1]
> > > > > > > > >>>>>>>>
> > > > > > > > >>>>>>
> > > > > > > > >>>>
> > > > > > > > >>
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics
> > > > > > > > >>>>>>>>
> > > > > > > > >>>>>>>>
> > > > > > > > >>>>>>>>> On Aug 28, 2023, at 01:04, Jing Ge
> > > > > > <j...@ververica.com.INVALID
> > > > > > > > >>>
> > > > > > > > >>>> wrote:
> > > > > > > > >>>>>>>>>
> > > > > > > > >>>>>>>>> Hi Xuannan,
> > > > > > > > >>>>>>>>>
> > > > > > > > >>>>>>>>> Thanks for the proposal. +1 for me.
> > > > > > > > >>>>>>>>>
> > > > > > > > >>>>>>>>> There is one tiny thing that I am not sure if I
> > > > understand
> > > > > it
> > > > > > > > >>>>>> correctly.
> > > > > > > > >>>>>>>>> Since there will be many different
> > WatermarkStrategies
> > > > and
> > > > > > > > >>>> different
> > > > > > > > >>>>>>>>> WatermarkGenerators. Could you please update the
> FLIP
> > > and
> > > > > add
> > > > > > > > >> the
> > > > > > > > >>>>>>>>> description of how the watermark lag is calculated
> > > > exactly?
> > > > > > > > >> E.g.
> > > > > > > > >>>>>>>> Watermark
> > > > > > > > >>>>>>>>> lag = A - B with A is the timestamp of the
> watermark
> > > > > emitted
> > > > > > > > >> to the
> > > > > > > > >>>>>>>>> downstream and B is....(this is the part I am not
> > > really
> > > > > sure
> > > > > > > > >> after
> > > > > > > > >>>>>>>> reading
> > > > > > > > >>>>>>>>> the FLIP).
> > > > > > > > >>>>>>>>>
> > > > > > > > >>>>>>>>> Best regards,
> > > > > > > > >>>>>>>>> Jing
> > > > > > > > >>>>>>>>>
> > > > > > > > >>>>>>>>>
> > > > > > > > >>>>>>>>> On Mon, Aug 21, 2023 at 9:03 AM Xuannan Su <
> > > > > > > > >> suxuanna...@gmail.com>
> > > > > > > > >>>>>>>> wrote:
> > > > > > > > >>>>>>>>>
> > > > > > > > >>>>>>>>>> Hi Jark,
> > > > > > > > >>>>>>>>>>
> > > > > > > > >>>>>>>>>> Thanks for the comments.
> > > > > > > > >>>>>>>>>>
> > > > > > > > >>>>>>>>>> I agree that the current solution cannot support
> > jobs
> > > > that
> > > > > > > > >> cannot
> > > > > > > > >>>>>> define
> > > > > > > > >>>>>>>>>> watermarks. However, after considering the
> > > > > > > > >> pending-record-based
> > > > > > > > >>>>>>>> solution, I
> > > > > > > > >>>>>>>>>> believe the current solution is superior for the
> > > target
> > > > > use
> > > > > > > > >> case
> > > > > > > > >>>> as it
> > > > > > > > >>>>>>>> is
> > > > > > > > >>>>>>>>>> more intuitive for users. The backlog status gives
> > > users
> > > > > the
> > > > > > > > >>>> ability
> > > > > > > > >>>>>> to
> > > > > > > > >>>>>>>>>> balance between throughput and latency. Making
> this
> > > > > > trade-off
> > > > > > > > >>>> decision
> > > > > > > > >>>>>>>>>> based on the watermark lag is more intuitive from
> > the
> > > > > user's
> > > > > > > > >>>>>>>> perspective.
> > > > > > > > >>>>>>>>>> For instance, a user can decide that if the job
> lags
> > > > > behind
> > > > > > > > >> the
> > > > > > > > >>>>>> current
> > > > > > > > >>>>>>>>>> time by more than 1 hour, the result is not
> usable.
> > In
> > > > > that
> > > > > > > > >> case,
> > > > > > > > >>>> we
> > > > > > > > >>>>>> can
> > > > > > > > >>>>>>>>>> optimize for throughput when the data lags behind
> by
> > > > more
> > > > > > > > >> than an
> > > > > > > > >>>>>> hour.
> > > > > > > > >>>>>>>>>> With the pending-record-based solution, it's
> > > challenging
> > > > > for
> > > > > > > > >>>> users to
> > > > > > > > >>>>>>>>>> determine when to optimize for throughput and when
> > to
> > > > > > > > >> prioritize
> > > > > > > > >>>>>>>> latency.
> > > > > > > > >>>>>>>>>>
> > > > > > > > >>>>>>>>>> Regarding the limitations of the watermark-based
> > > > solution:
> > > > > > > > >>>>>>>>>>
> > > > > > > > >>>>>>>>>> 1. The current solution can support jobs with
> > sources
> > > > that
> > > > > > > > >> have
> > > > > > > > >>>> event
> > > > > > > > >>>>>>>>>> time. Users can always define a watermark at the
> > > source
> > > > > > > > >> operator,
> > > > > > > > >>>> even
> > > > > > > > >>>>>>>> if
> > > > > > > > >>>>>>>>>> it's not used by downstream operators, such as
> > > streaming
> > > > > > join
> > > > > > > > >> and
> > > > > > > > >>>>>>>> unbounded
> > > > > > > > >>>>>>>>>> aggregate.
> > > > > > > > >>>>>>>>>>
> > > > > > > > >>>>>>>>>> 2.I don't believe it's accurate to say that the
> > > > watermark
> > > > > > lag
> > > > > > > > >> will
> > > > > > > > >>>>>> keep
> > > > > > > > >>>>>>>>>> increasing if no data is generated in Kafka. The
> > > > watermark
> > > > > > > > >> lag and
> > > > > > > > >>>>>>>> backlog
> > > > > > > > >>>>>>>>>> status are determined at the moment when the
> > watermark
> > > > is
> > > > > > > > >> emitted
> > > > > > > > >>>> to
> > > > > > > > >>>>>> the
> > > > > > > > >>>>>>>>>> downstream operator. If no data is emitted from
> the
> > > > > source,
> > > > > > > > >> the
> > > > > > > > >>>>>>>> watermark
> > > > > > > > >>>>>>>>>> lag and backlog status will not be updated. If the
> > > > > > > > >>>> WatermarkStrategy
> > > > > > > > >>>>>>>> with
> > > > > > > > >>>>>>>>>> idleness is used, the source becomes non-backlog
> > when
> > > it
> > > > > > > > >> becomes
> > > > > > > > >>>> idle.
> > > > > > > > >>>>>>>>>>
> > > > > > > > >>>>>>>>>> 3. I think watermark lag is more intuitive to
> > > determine
> > > > > if a
> > > > > > > > >> job
> > > > > > > > >>>> is
> > > > > > > > >>>>>>>>>> processing backlog data. Even when using pending
> > > > records,
> > > > > it
> > > > > > > > >>>> faces a
> > > > > > > > >>>>>>>>>> similar issue. For example, if the source has 1K
> > > pending
> > > > > > > > >> records,
> > > > > > > > >>>>>> those
> > > > > > > > >>>>>>>>>> records can span from 1 day  to 1 hour to 1
> second.
> > If
> > > > the
> > > > > > > > >> records
> > > > > > > > >>>>>> span
> > > > > > > > >>>>>>>> 1
> > > > > > > > >>>>>>>>>> day, it's probably best to optimize for
> throughput.
> > If
> > > > > they
> > > > > > > > >> span 1
> > > > > > > > >>>>>>>> hour, it
> > > > > > > > >>>>>>>>>> depends on the business logic. If they span 1
> > second,
> > > > > > > > >> optimizing
> > > > > > > > >>>> for
> > > > > > > > >>>>>>>>>> latency is likely the better choice.
> > > > > > > > >>>>>>>>>>
> > > > > > > > >>>>>>>>>> In summary, I believe the watermark-based solution
> > is
> > > a
> > > > > > > > >> superior
> > > > > > > > >>>>>> choice
> > > > > > > > >>>>>>>>>> for the target use case where watermark/event time
> > can
> > > > be
> > > > > > > > >> defined.
> > > > > > > > >>>>>>>>>> Additionally, I haven't come across a scenario
> that
> > > > > requires
> > > > > > > > >>>>>> low-latency
> > > > > > > > >>>>>>>>>> processing and reads from a source that cannot
> > define
> > > > > > > > >> watermarks.
> > > > > > > > >>>> If
> > > > > > > > >>>>>> we
> > > > > > > > >>>>>>>>>> encounter such a use case, we can create another
> > FLIP
> > > to
> > > > > > > > >> address
> > > > > > > > >>>> those
> > > > > > > > >>>>>>>>>> needs in the future. What do you think?
> > > > > > > > >>>>>>>>>>
> > > > > > > > >>>>>>>>>>
> > > > > > > > >>>>>>>>>> Best,
> > > > > > > > >>>>>>>>>> Xuannan
> > > > > > > > >>>>>>>>>>
> > > > > > > > >>>>>>>>>>
> > > > > > > > >>>>>>>>>>
> > > > > > > > >>>>>>>>>>> On Aug 20, 2023, at 23:27, Jark Wu <
> > imj...@gmail.com
> > > > > > > > >> <mailto:
> > > > > > > > >>>>>>>>>> imj...@gmail.com>> wrote:
> > > > > > > > >>>>>>>>>>>
> > > > > > > > >>>>>>>>>>> Hi Xuannan,
> > > > > > > > >>>>>>>>>>>
> > > > > > > > >>>>>>>>>>> Thanks for opening this discussion.
> > > > > > > > >>>>>>>>>>>
> > > > > > > > >>>>>>>>>>> This current proposal may work in the mentioned
> > > > watermark
> > > > > > > > >> cases.
> > > > > > > > >>>>>>>>>>> However, it seems this is not a general solution
> > for
> > > > > > sources
> > > > > > > > >> to
> > > > > > > > >>>>>>>> determine
> > > > > > > > >>>>>>>>>>> "isProcessingBacklog".
> > > > > > > > >>>>>>>>>>> From my point of view, there are 3 limitations of
> > the
> > > > > > current
> > > > > > > > >>>>>> proposal:
> > > > > > > > >>>>>>>>>>> 1. It doesn't cover jobs that don't have
> > > > > > watermark/event-time
> > > > > > > > >>>>>> defined,
> > > > > > > > >>>>>>>>>>> for example streaming join and unbounded
> aggregate.
> > > We
> > > > > may
> > > > > > > > >> still
> > > > > > > > >>>> need
> > > > > > > > >>>>>>>> to
> > > > > > > > >>>>>>>>>>> figure out solutions for them.
> > > > > > > > >>>>>>>>>>> 2. Watermark lag can not be trusted, because it
> > > > increases
> > > > > > > > >>>> unlimited
> > > > > > > > >>>>>> if
> > > > > > > > >>>>>>>> no
> > > > > > > > >>>>>>>>>>> data is generated in the Kafka.
> > > > > > > > >>>>>>>>>>> But in this case, there is no backlog at all.
> > > > > > > > >>>>>>>>>>> 3. Watermark lag is hard to reflect the amount of
> > > > > backlog.
> > > > > > > > >> If the
> > > > > > > > >>>>>>>>>> watermark
> > > > > > > > >>>>>>>>>>> lag is 1day or 1 hour or 1second,
> > > > > > > > >>>>>>>>>>> there is possibly only 1 pending record there,
> > which
> > > > > means
> > > > > > no
> > > > > > > > >>>> backlog
> > > > > > > > >>>>>>>> at
> > > > > > > > >>>>>>>>>>> all.
> > > > > > > > >>>>>>>>>>>
> > > > > > > > >>>>>>>>>>> Therefore, IMO, watermark maybe not the ideal
> > metric
> > > > used
> > > > > > to
> > > > > > > > >>>>>> determine
> > > > > > > > >>>>>>>>>>> "isProcessingBacklog".
> > > > > > > > >>>>>>>>>>> What we need is something that reflects the
> number
> > of
> > > > > > records
> > > > > > > > >>>>>>>> unprocessed
> > > > > > > > >>>>>>>>>>> by the job.
> > > > > > > > >>>>>>>>>>> Actually, that is the "pendingRecords" metric
> > > proposed
> > > > in
> > > > > > > > >>>> FLIP-33 and
> > > > > > > > >>>>>>>> has
> > > > > > > > >>>>>>>>>>> been implemented by Kafka source.
> > > > > > > > >>>>>>>>>>> Did you consider using "pendingRecords" metric to
> > > > > determine
> > > > > > > > >>>>>>>>>>> "isProcessingBacklog"?
> > > > > > > > >>>>>>>>>>>
> > > > > > > > >>>>>>>>>>> Best,
> > > > > > > > >>>>>>>>>>> Jark
> > > > > > > > >>>>>>>>>>>
> > > > > > > > >>>>>>>>>>>
> > > > > > > > >>>>>>>>>>> [1]
> > > > > > > > >>>>>>>>>>>
> > > > > > > > >>>>>>>>>>
> > > > > > > > >>>>>>>>
> > > > > > > > >>>>>>
> > > > > > > > >>>>
> > > > > > > > >>
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics
> > > > > > > > >>>>>>>>>> <
> > > > > > > > >>>>>>>>>>
> > > > > > > > >>>>>>>>
> > > > > > > > >>>>>>
> > > > > > > > >>>>
> > > > > > > > >>
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics
> > > > > > > > >>>>>>>>>>>
> > > > > > > > >>>>>>>>>>>
> > > > > > > > >>>>>>>>>>>
> > > > > > > > >>>>>>>>>>>
> > > > > > > > >>>>>>>>>>> On Tue, 15 Aug 2023 at 12:04, Xintong Song <
> > > > > > > > >>>> tonysong...@gmail.com
> > > > > > > > >>>>>>>>>> <mailto:tonysong...@gmail.com>> wrote:
> > > > > > > > >>>>>>>>>>>
> > > > > > > > >>>>>>>>>>>> Sounds good to me.
> > > > > > > > >>>>>>>>>>>>
> > > > > > > > >>>>>>>>>>>> It is true that, if we are introducing the
> > > generalized
> > > > > > > > >>>> watermark,
> > > > > > > > >>>>>>>> there
> > > > > > > > >>>>>>>>>>>> will be other watermark related concepts /
> > > > > configurations
> > > > > > > > >> that
> > > > > > > > >>>> need
> > > > > > > > >>>>>> to
> > > > > > > > >>>>>>>>>> be
> > > > > > > > >>>>>>>>>>>> updated anyway.
> > > > > > > > >>>>>>>>>>>>
> > > > > > > > >>>>>>>>>>>>
> > > > > > > > >>>>>>>>>>>> Best,
> > > > > > > > >>>>>>>>>>>>
> > > > > > > > >>>>>>>>>>>> Xintong
> > > > > > > > >>>>>>>>>>>>
> > > > > > > > >>>>>>>>>>>>
> > > > > > > > >>>>>>>>>>>>
> > > > > > > > >>>>>>>>>>>> On Tue, Aug 15, 2023 at 11:30 AM Xuannan Su <
> > > > > > > > >>>> suxuanna...@gmail.com
> > > > > > > > >>>>>>>>>> <mailto:suxuanna...@gmail.com>> wrote:
> > > > > > > > >>>>>>>>>>>>
> > > > > > > > >>>>>>>>>>>>> Hi Xingtong,
> > > > > > > > >>>>>>>>>>>>>
> > > > > > > > >>>>>>>>>>>>> Thank you for your suggestion.
> > > > > > > > >>>>>>>>>>>>>
> > > > > > > > >>>>>>>>>>>>> After considering the idea of using a general
> > > > > > configuration
> > > > > > > > >>>> key, I
> > > > > > > > >>>>>>>>>> think
> > > > > > > > >>>>>>>>>>>>> it may not be a good idea for the reasons
> below.
> > > > > > > > >>>>>>>>>>>>>
> > > > > > > > >>>>>>>>>>>>> While I agree that using a more general
> > > configuration
> > > > > key
> > > > > > > > >>>> provides
> > > > > > > > >>>>>> us
> > > > > > > > >>>>>>>>>>>> with
> > > > > > > > >>>>>>>>>>>>> the flexibility to switch to other approaches
> to
> > > > > > calculate
> > > > > > > > >> the
> > > > > > > > >>>> lag
> > > > > > > > >>>>>> in
> > > > > > > > >>>>>>>>>> the
> > > > > > > > >>>>>>>>>>>>> future, the downside is that it may cause
> > confusion
> > > > for
> > > > > > > > >> users.
> > > > > > > > >>>> We
> > > > > > > > >>>>>>>>>>>> currently
> > > > > > > > >>>>>>>>>>>>> have fetchEventTimeLag, emitEventTimeLag, and
> > > > > > watermarkLag
> > > > > > > > >> in
> > > > > > > > >>>> the
> > > > > > > > >>>>>>>>>> source,
> > > > > > > > >>>>>>>>>>>>> and it is not clear which specific lag we are
> > > > referring
> > > > > > to.
> > > > > > > > >>>> With
> > > > > > > > >>>>>> the
> > > > > > > > >>>>>>>>>>>>> potential introduction of the Generalized
> > Watermark
> > > > > > > > >> mechanism
> > > > > > > > >>>> in
> > > > > > > > >>>>>> the
> > > > > > > > >>>>>>>>>>>>> future, if I understand correctly, a watermark
> > > won't
> > > > > > > > >>>> necessarily
> > > > > > > > >>>>>> need
> > > > > > > > >>>>>>>>>> to
> > > > > > > > >>>>>>>>>>>> be
> > > > > > > > >>>>>>>>>>>>> a timestamp. I am concern that the general
> > > > > configuration
> > > > > > > > >> key
> > > > > > > > >>>> may
> > > > > > > > >>>>>> not
> > > > > > > > >>>>>>>>>> be
> > > > > > > > >>>>>>>>>>>>> enough to cover all the use case and we will
> need
> > > to
> > > > > > > > >> introduce
> > > > > > > > >>>> a
> > > > > > > > >>>>>>>>>> general
> > > > > > > > >>>>>>>>>>>>> way to determine the backlog status regardless.
> > > > > > > > >>>>>>>>>>>>>
> > > > > > > > >>>>>>>>>>>>> For the reasons above, I prefer introducing the
> > > > > > > > >> configuration
> > > > > > > > >>>> as
> > > > > > > > >>>>>> is,
> > > > > > > > >>>>>>>>>> and
> > > > > > > > >>>>>>>>>>>>> change it later with the a deprecation process
> or
> > > > > > migration
> > > > > > > > >>>>>> process.
> > > > > > > > >>>>>>>>>> What
> > > > > > > > >>>>>>>>>>>>> do you think?
> > > > > > > > >>>>>>>>>>>>>
> > > > > > > > >>>>>>>>>>>>> Best,
> > > > > > > > >>>>>>>>>>>>> Xuannan
> > > > > > > > >>>>>>>>>>>>> On Aug 14, 2023, 14:09 +0800, Xintong Song <
> > > > > > > > >>>> tonysong...@gmail.com
> > > > > > > > >>>>>>>>>> <mailto:tonysong...@gmail.com>>,
> > > > > > > > >>>>>>>>>>>> wrote:
> > > > > > > > >>>>>>>>>>>>>> Thanks for the explanation.
> > > > > > > > >>>>>>>>>>>>>>
> > > > > > > > >>>>>>>>>>>>>> I wonder if it makes sense to not expose this
> > > detail
> > > > > via
> > > > > > > > >> the
> > > > > > > > >>>>>>>>>>>>> configuration
> > > > > > > > >>>>>>>>>>>>>> option. To be specific, I suggest not
> mentioning
> > > the
> > > > > > > > >>>> "watermark"
> > > > > > > > >>>>>>>>>>>> keyword
> > > > > > > > >>>>>>>>>>>>> in
> > > > > > > > >>>>>>>>>>>>>> the configuration key and description.
> > > > > > > > >>>>>>>>>>>>>>
> > > > > > > > >>>>>>>>>>>>>> - From the users' perspective, I think they
> only
> > > > need
> > > > > to
> > > > > > > > >> know
> > > > > > > > >>>>>>>> there's
> > > > > > > > >>>>>>>>>> a
> > > > > > > > >>>>>>>>>>>>>> lag higher than the given threshold, Flink
> will
> > > > > consider
> > > > > > > > >>>> latency
> > > > > > > > >>>>>> of
> > > > > > > > >>>>>>>>>>>>>> individual records as less important and
> > > prioritize
> > > > > > > > >> throughput
> > > > > > > > >>>>>> over
> > > > > > > > >>>>>>>>>> it.
> > > > > > > > >>>>>>>>>>>>>> They don't really need the details of how the
> > lags
> > > > are
> > > > > > > > >>>> calculated.
> > > > > > > > >>>>>>>>>>>>>> - For the internal implementation, I also
> think
> > > > using
> > > > > > > > >>>> watermark
> > > > > > > > >>>>>> lags
> > > > > > > > >>>>>>>>>> is
> > > > > > > > >>>>>>>>>>>>>> a good idea, for the reasons you've already
> > > > mentioned.
> > > > > > > > >>>> However,
> > > > > > > > >>>>>> it's
> > > > > > > > >>>>>>>>>>>> not
> > > > > > > > >>>>>>>>>>>>>> the only possible option. Hiding this detail
> > from
> > > > > users
> > > > > > > > >> would
> > > > > > > > >>>> give
> > > > > > > > >>>>>>>> us
> > > > > > > > >>>>>>>>>>>> the
> > > > > > > > >>>>>>>>>>>>>> flexibility to switch to other approaches if
> > > needed
> > > > in
> > > > > > > > >> future.
> > > > > > > > >>>>>>>>>>>>>> - We are currently working on designing the
> > > > > > > > >> ProcessFunction
> > > > > > > > >>>> API
> > > > > > > > >>>>>>>>>>>>>> (consider it as a DataStream API V2). There's
> an
> > > > idea
> > > > > to
> > > > > > > > >>>>>> introduce a
> > > > > > > > >>>>>>>>>>>>>> Generalized Watermark mechanism, where
> basically
> > > the
> > > > > > > > >>>> watermark can
> > > > > > > > >>>>>>>> be
> > > > > > > > >>>>>>>>>>>>>> anything that needs to travel along the
> > data-flow
> > > > with
> > > > > > > > >> certain
> > > > > > > > >>>>>>>>>>>> alignment
> > > > > > > > >>>>>>>>>>>>>> strategies, and event time watermark would be
> > one
> > > > > > specific
> > > > > > > > >>>> case of
> > > > > > > > >>>>>>>> it.
> > > > > > > > >>>>>>>>>>>>> This
> > > > > > > > >>>>>>>>>>>>>> is still an idea and has not been discussed
> and
> > > > agreed
> > > > > > on
> > > > > > > > >> by
> > > > > > > > >>>> the
> > > > > > > > >>>>>>>>>>>>> community,
> > > > > > > > >>>>>>>>>>>>>> and we are preparing a FLIP for it. But if we
> > are
> > > > > going
> > > > > > > > >> for
> > > > > > > > >>>> it,
> > > > > > > > >>>>>> the
> > > > > > > > >>>>>>>>>>>>> concept
> > > > > > > > >>>>>>>>>>>>>> "watermark-lag-threshold" could be ambiguous.
> > > > > > > > >>>>>>>>>>>>>>
> > > > > > > > >>>>>>>>>>>>>> I do not intend to block the FLIP on this. I'd
> > > also
> > > > be
> > > > > > > > >> fine
> > > > > > > > >>>> with
> > > > > > > > >>>>>>>>>>>>>> introducing the configuration as is, and
> > changing
> > > it
> > > > > > > > >> later, if
> > > > > > > > >>>>>>>> needed,
> > > > > > > > >>>>>>>>>>>>> with
> > > > > > > > >>>>>>>>>>>>>> a regular deprecation and migration process.
> > Just
> > > > > making
> > > > > > > > >> my
> > > > > > > > >>>>>>>>>>>> suggestions.
> > > > > > > > >>>>>>>>>>>>>>
> > > > > > > > >>>>>>>>>>>>>>
> > > > > > > > >>>>>>>>>>>>>> Best,
> > > > > > > > >>>>>>>>>>>>>>
> > > > > > > > >>>>>>>>>>>>>> Xintong
> > > > > > > > >>>>>>>>>>>>>>
> > > > > > > > >>>>>>>>>>>>>>
> > > > > > > > >>>>>>>>>>>>>>
> > > > > > > > >>>>>>>>>>>>>> On Mon, Aug 14, 2023 at 12:00 PM Xuannan Su <
> > > > > > > > >>>>>> suxuanna...@gmail.com
> > > > > > > > >>>>>>>>>> <mailto:suxuanna...@gmail.com>>
> > > > > > > > >>>>>>>>>>>>> wrote:
> > > > > > > > >>>>>>>>>>>>>>
> > > > > > > > >>>>>>>>>>>>>>> Hi Xintong,
> > > > > > > > >>>>>>>>>>>>>>>
> > > > > > > > >>>>>>>>>>>>>>> Thanks for the reply.
> > > > > > > > >>>>>>>>>>>>>>>
> > > > > > > > >>>>>>>>>>>>>>> I have considered using the timestamp in the
> > > > records
> > > > > to
> > > > > > > > >>>> determine
> > > > > > > > >>>>>>>> the
> > > > > > > > >>>>>>>>>>>>>>> backlog status, and decided to use watermark
> at
> > > the
> > > > > > end.
> > > > > > > > >> By
> > > > > > > > >>>>>>>>>>>> definition,
> > > > > > > > >>>>>>>>>>>>>>> watermark is the time progress indication in
> > the
> > > > data
> > > > > > > > >>>> stream. It
> > > > > > > > >>>>>>>>>>>>> indicates
> > > > > > > > >>>>>>>>>>>>>>> the stream’s event time has progressed to
> some
> > > > > specific
> > > > > > > > >>>> time. On
> > > > > > > > >>>>>>>> the
> > > > > > > > >>>>>>>>>>>>> other
> > > > > > > > >>>>>>>>>>>>>>> hand, timestamp in the records is usually
> used
> > to
> > > > > > > > >> generate
> > > > > > > > >>>> the
> > > > > > > > >>>>>>>>>>>>> watermark.
> > > > > > > > >>>>>>>>>>>>>>> Therefore, it appears more appropriate and
> > > > intuitive
> > > > > to
> > > > > > > > >>>> calculate
> > > > > > > > >>>>>>>> the
> > > > > > > > >>>>>>>>>>>>> event
> > > > > > > > >>>>>>>>>>>>>>> time lag by watermark and determine the
> backlog
> > > > > status.
> > > > > > > > >> And
> > > > > > > > >>>> by
> > > > > > > > >>>>>>>> using
> > > > > > > > >>>>>>>>>>>>> the
> > > > > > > > >>>>>>>>>>>>>>> watermark, we can easily deal with the
> > > out-of-order
> > > > > and
> > > > > > > > >> the
> > > > > > > > >>>>>>>> idleness
> > > > > > > > >>>>>>>>>>>>> of the
> > > > > > > > >>>>>>>>>>>>>>> data.
> > > > > > > > >>>>>>>>>>>>>>>
> > > > > > > > >>>>>>>>>>>>>>> Please let me know if you have further
> > questions.
> > > > > > > > >>>>>>>>>>>>>>>
> > > > > > > > >>>>>>>>>>>>>>> Best,
> > > > > > > > >>>>>>>>>>>>>>> Xuannan
> > > > > > > > >>>>>>>>>>>>>>> On Aug 10, 2023, 20:23 +0800, Xintong Song <
> > > > > > > > >>>>>> tonysong...@gmail.com
> > > > > > > > >>>>>>>>>> <mailto:tonysong...@gmail.com>>,
> > > > > > > > >>>>>>>>>>>>> wrote:
> > > > > > > > >>>>>>>>>>>>>>>> Thanks for preparing the FLIP, Xuannan.
> > > > > > > > >>>>>>>>>>>>>>>>
> > > > > > > > >>>>>>>>>>>>>>>> +1 in general.
> > > > > > > > >>>>>>>>>>>>>>>>
> > > > > > > > >>>>>>>>>>>>>>>> A quick question, could you explain why we
> are
> > > > > relying
> > > > > > > > >> on
> > > > > > > > >>>> the
> > > > > > > > >>>>>>>>>>>>> watermark
> > > > > > > > >>>>>>>>>>>>>>> for
> > > > > > > > >>>>>>>>>>>>>>>> emitting the record attribute? Why not use
> > > > > timestamps
> > > > > > > > >> in the
> > > > > > > > >>>>>>>>>>>>> records? I
> > > > > > > > >>>>>>>>>>>>>>>> don't see any concern in using watermarks.
> > Just
> > > > > > > > >> wondering if
> > > > > > > > >>>>>>>>>>>> there's
> > > > > > > > >>>>>>>>>>>>> any
> > > > > > > > >>>>>>>>>>>>>>>> deep considerations behind this.
> > > > > > > > >>>>>>>>>>>>>>>>
> > > > > > > > >>>>>>>>>>>>>>>> Best,
> > > > > > > > >>>>>>>>>>>>>>>>
> > > > > > > > >>>>>>>>>>>>>>>> Xintong
> > > > > > > > >>>>>>>>>>>>>>>>
> > > > > > > > >>>>>>>>>>>>>>>>
> > > > > > > > >>>>>>>>>>>>>>>>
> > > > > > > > >>>>>>>>>>>>>>>> On Thu, Aug 3, 2023 at 3:03 PM Xuannan Su <
> > > > > > > > >>>>>> suxuanna...@gmail.com
> > > > > > > > >>>>>>>>>> <mailto:suxuanna...@gmail.com>>
> > > > > > > > >>>>>>>>>>>>> wrote:
> > > > > > > > >>>>>>>>>>>>>>>>
> > > > > > > > >>>>>>>>>>>>>>>>> Hi all,
> > > > > > > > >>>>>>>>>>>>>>>>>
> > > > > > > > >>>>>>>>>>>>>>>>> I am opening this thread to discuss
> FLIP-328:
> > > > Allow
> > > > > > > > >> source
> > > > > > > > >>>>>>>>>>>>> operators to
> > > > > > > > >>>>>>>>>>>>>>>>> determine isProcessingBacklog based on
> > > watermark
> > > > > > > > >> lag[1]. We
> > > > > > > > >>>>>> had a
> > > > > > > > >>>>>>>>>>>>>>> several
> > > > > > > > >>>>>>>>>>>>>>>>> discussions with Dong Ling about the
> design,
> > > and
> > > > > > thanks
> > > > > > > > >>>> for all
> > > > > > > > >>>>>>>>>>>> the
> > > > > > > > >>>>>>>>>>>>>>>>> valuable advice.
> > > > > > > > >>>>>>>>>>>>>>>>>
> > > > > > > > >>>>>>>>>>>>>>>>> The FLIP aims to target the use-case where
> > user
> > > > > want
> > > > > > to
> > > > > > > > >>>> run a
> > > > > > > > >>>>>>>>>>>> Flink
> > > > > > > > >>>>>>>>>>>>>>> job to
> > > > > > > > >>>>>>>>>>>>>>>>> backfill historical data in a high
> throughput
> > > > > manner
> > > > > > > > >> and
> > > > > > > > >>>>>> continue
> > > > > > > > >>>>>>>>>>>>>>>>> processing real-time data with low latency.
> > > > > Building
> > > > > > > > >> upon
> > > > > > > > >>>> the
> > > > > > > > >>>>>>>>>>>>> backlog
> > > > > > > > >>>>>>>>>>>>>>>>> concept introduced in FLIP-309[2], this
> > > proposal
> > > > > > > > >> enables
> > > > > > > > >>>>>> sources
> > > > > > > > >>>>>>>>>>>> to
> > > > > > > > >>>>>>>>>>>>>>> report
> > > > > > > > >>>>>>>>>>>>>>>>> their status of processing backlog based on
> > the
> > > > > > > > >> watermark
> > > > > > > > >>>> lag.
> > > > > > > > >>>>>>>>>>>>>>>>>
> > > > > > > > >>>>>>>>>>>>>>>>> We would greatly appreciate any comments or
> > > > > feedback
> > > > > > > > >> you
> > > > > > > > >>>> may
> > > > > > > > >>>>>> have
> > > > > > > > >>>>>>>>>>>>> on
> > > > > > > > >>>>>>>>>>>>>>> this
> > > > > > > > >>>>>>>>>>>>>>>>> proposal.
> > > > > > > > >>>>>>>>>>>>>>>>>
> > > > > > > > >>>>>>>>>>>>>>>>> Best,
> > > > > > > > >>>>>>>>>>>>>>>>> Xuannan
> > > > > > > > >>>>>>>>>>>>>>>>>
> > > > > > > > >>>>>>>>>>>>>>>>>
> > > > > > > > >>>>>>>>>>>>>>>>> [1]
> > > > > > > > >>>>>>>>>>>>>>>>>
> > > > > > > > >>>>>>>>>>>>>>>
> > > > > > > > >>>>>>>>>>>>>
> > > > > > > > >>>>>>>>>>>>
> > > > > > > > >>>>>>>>>>
> > > > > > > > >>>>>>>>
> > > > > > > > >>>>>>
> > > > > > > > >>>>
> > > > > > > > >>
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-328%3A+Allow+source+operators+to+determine+isProcessingBacklog+based+on+watermark+lag
> > > > > > > > >>>>>>>>>> <
> > > > > > > > >>>>>>>>>>
> > > > > > > > >>>>>>>>
> > > > > > > > >>>>>>
> > > > > > > > >>>>
> > > > > > > > >>
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-328%3A+Allow+source+operators+to+determine+isProcessingBacklog+based+on+watermark+lag
> > > > > > > > >>>>>>>>>>>
> > > > > > > > >>>>>>>>>>>>>>>>> [2]
> > > > > > > > >>>>>>>>>>>>>>>>>
> > > > > > > > >>>>>>>>>>>>>>>
> > > > > > > > >>>>>>>>>>>>>
> > > > > > > > >>>>>>>>>>>>
> > > > > > > > >>>>>>>>>>
> > > > > > > > >>>>>>>>
> > > > > > > > >>>>>>
> > > > > > > > >>>>
> > > > > > > > >>
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-309%3A+Support+using+larger+checkpointing+interval+when+source+is+processing+backlog
> > > > > > > > >>>>>>>>>> <
> > > > > > > > >>>>>>>>>>
> > > > > > > > >>>>>>>>
> > > > > > > > >>>>>>
> > > > > > > > >>>>
> > > > > > > > >>
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-309%3A+Support+using+larger+checkpointing+interval+when+source+is+processing+backlog
> > > > > > > > >>>>>>>>
> > > > > > > > >>>>>>>>
> > > > > > > > >>>>>>>>
> > > > > > > > >>>>>>
> > > > > > > > >>>>>>
> > > > > > > > >>>>
> > > > > > > > >>
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to