Hi Jark,

Please see my reply inline.

On Fri, Sep 15, 2023 at 2:01 PM Jark Wu <imj...@gmail.com> wrote:

> Hi Dong,
>
> > Note that we can not simply enforce the semantics of "any invocation of
> > setIsProcessingBacklog(false) will set the job's backlog status to
> false".
> > Suppose we have a job with two operators, where operatorA invokes
> > setIsProcessingBacklog(false) and operatorB invokes
> > setIsProcessingBacklog(true). There will be conflict if we use the
> > semantics of "any invocation of setIsProcessingBacklog(false) will set
> the
> > job's backlog status to false".
>
> So it should set job's backlog status to false if the job has only a single
> source,
> right?
>

As of the FLIP-309's implementation, with the constraint that there is only
one API to set backlog=true, your understanding is correct.

However, it might be useful to recall that FLIP-309 is proposed with the
explicit plan to add more strategies to set backlog=true, which is
documented in its future work section. The above statement, which assumes
there is only one strategy, is not useful in the long term.

With this knowledge in mind, a better and more long-lasting way to
interpret the semantics of setIsProcessingBacklog() would be this: "it
should set job's backlog status to false if the job has only a single
source AND there is no other strategy to set backlog=true".


> Could you elaborate on the behavior if there is a job with a single source,
> and the watermark lag exceeds the configured value (should set backlog to
> true?),
> but the source invokes "setIsProcessingBacklog(false)"? Or the inverse one,
> the source invokes "setIsProcessingBacklog(false)" first, but the watermark
> lag
> exceeds the configured value.
>

Suppose setIsProcessingBacklog(false) is invoked, and watermark lag exceeds
the configured value, then the job's backlog status is set to true.

The rationale is that if there is any strategy that is triggered and says
backlog=true, then job's backlog should be true. Otherwise, the job's
backlog status is false.

The key idea to avoid conflict is that we will not add any API with the
capability to explicitly set job's backlog status to false. A job's backlog
status is false if and only if no rule says it should be set to true.

BTW, I can understand that setIsProcessingBacklog(false) appears to suggest
the job's backlog is set to false. We can certainly update its Java doc to
make the semantics clearer. If you have any suggestion on a better name for
this method, we can also update it accordingly.

What do you think?


> This is the conflict I'm concerned about.
>
> Best,
> Jark
>
> On Fri, 15 Sept 2023 at 12:00, Dong Lin <lindon...@gmail.com> wrote:
>
> > Hi Jark,
> >
> > Please see my comments inline.
> >
> > On Fri, Sep 15, 2023 at 10:35 AM Jark Wu <imj...@gmail.com> wrote:
> >
> > > Hi Dong,
> > >
> > > Please see my comments inline below.
> >
> >
> > > > Hmm.. can you explain what you mean by "different watermark delay
> > > > definitions for each source"?
> > >
> > > For example, "table1" defines a watermark with delay 5 seconds,
> > > "table2" defines a watermark with delay 10 seconds. They have different
> > > watermark delay definitions. So it is also reasonable they have
> different
> > > watermark lag definitions, e.g., "table1" allows "10mins" and "table2"
> > > allows "20mins".
> > >
> >
> > I think the watermark delay you mentioned above is conceptually /
> > fundamentally different from the watermark-lag-threshold proposed in this
> > FLIP.
> >
> > It might be useful to revisit the semantics of these two concepts:
> > - watermark delay is used to account for the maximum amount of
> orderliness
> > that users expect (or willing to wait for) for records from a given
> source.
> > - watermark-lag-threshold is used to define when processing latency is no
> > longer important (e.g. because data is already stale).
> >
> > Even though users might expect different out of orderliness for different
> > sources, users do not necessarily have different definitions / thresholds
> > for when a record is considered "already stale".
> >
> >
> > >
> > > > I think there is probably misunderstanding here. FLIP-309 does NOT
> > > directly
> > > > specify when backlog is false. It is intentionally specified in such
> a
> > > way
> > > > that there will  not be any conflict between these rules.
> > >
> > > Do you mean FLIP-309 doesn't allow to specify backlog to be false?
> > > Is this mentioned in FLIP-309? This is completely different from what I
> > >
> >
> > Can you explain what you mean by "allow to specify backlog to be false"?
> >
> > If what you mean is that "can invoke setIsProcessingBacklog(false)", then
> > FLIP-309 supports doing this.
> >
> > If what you mean is that "any invocation of setIsProcessingBacklog(false)
> > will set the job's backlog status to false", then FLIP-309 does not
> support
> > this. I believe the existing Java doc of this API and FLIP-309 is
> > compatible with this explanation.
> >
> > Note that we can not simply enforce the semantics of "any invocation of
> > setIsProcessingBacklog(false) will set the job's backlog status to
> false".
> > Suppose we have a job with two operators, where operatorA invokes
> > setIsProcessingBacklog(false) and operatorB invokes
> > setIsProcessingBacklog(true). There will be conflict if we use the
> > semantics of "any invocation of setIsProcessingBacklog(false) will set
> the
> > job's backlog status to false".
> >
> > Would this answer your question?
> >
> > Best,
> > Dong
> >
> >
> > > understand. From the API interface
> "ctx.setIsProcessingBacklog(boolean)",
> > > it allows users to invoke "setIsProcessingBacklog(false)". And FLIP-309
> > > also says "MySQL CDC source should report isProcessingBacklog=false
> > > at the beginning of the changelog stage." If not, maybe we need to
> > revisit
> > > FLIP-309.
> >
> >
> > > Best,
> > > Jark
> > >
> > >
> > >
> > > On Fri, 15 Sept 2023 at 08:41, Dong Lin <lindon...@gmail.com> wrote:
> > >
> > > > Hi Jark,
> > > >
> > > > Do you have any follow-up comment?
> > > >
> > > > My gut feeling is that suppose we need to support per-source
> watermark
> > > lag
> > > > specification in the future (not sure we have a use-case for this
> right
> > > > now), we can add such a config in the future with a follow-up FLIP.
> The
> > > > job-level config will still be useful as it makes users'
> configuration
> > > > simpler for common scenarios.
> > > >
> > > > If it is OK, can we agree to make incremental progress for Flink and
> > > start
> > > > a voting thread for this FLIP?
> > > >
> > > > Thanks,
> > > > Dong
> > > >
> > > >
> > > > On Mon, Sep 11, 2023 at 4:41 PM Jark Wu <imj...@gmail.com> wrote:
> > > >
> > > > > Hi Dong,
> > > > >
> > > > > Please see my comments inline.
> > > > >
> > > > > >  As a result, the proposed job-level
> > > > > > config will be applied only in the changelog stage. So there is
> no
> > > > > > difference between these two approaches in this particular case,
> > > right?
> > > > >
> > > > > How the job-level config can be applied ONLY in the changelog
> stage?
> > > > > I think it is only possible if it is implemented by the CDC source
> > > > itself,
> > > > > because the framework doesn't know which stage of the source is.
> > > > > Know that the CDC source may emit watermarks with a very small lag
> > > > > in the snapshot stage, and the job-level config may turn the
> backlog
> > > > > status into false.
> > > > >
> > > > > > On the other hand, per-source config will be necessary if users
> > want
> > > to
> > > > > > apply different watermark lag thresholds for different sources in
> > the
> > > > > same
> > > > > > job.
> > > > >
> > > > > We also have different watermark delay definitions for each source,
> > > > > I think this's also reasonable and necessary to have different
> > > watermark
> > > > > lags.
> > > > >
> > > > >
> > > > > > Each source can have its own rule that specifies when the backlog
> > can
> > > > be
> > > > > true
> > > > > > (e.g. MySql CDC says the backlog should be true during the
> snapshot
> > > > > stage).
> > > > > > And we can have a job-level config that specifies when the
> backlog
> > > > should
> > > > > > be true. Note that it is designed in such a way that none of
> these
> > > > rules
> > > > > > specify when the backlog should be false. That is why there is no
> > > > > conflict
> > > > > > by definition.
> > > > >
> > > > > IIUC, FLIP-309 provides `setIsProcessingBacklog` to specify when
> the
> > > > > backlog
> > > > > is true and when is FALSE. This conflicts with the job-level config
> > as
> > > it
> > > > > will turn
> > > > > the status into true.
> > > > >
> > > > > > If I understand your comments correctly, you mean that we might
> > have
> > > a
> > > > > > Flink SQL DDL with user-defined watermark expressions. And users
> > also
> > > > > want
> > > > > > to set the backlog to true if the watermark generated by that
> > > > > > user-specified expression exceeds a threshold.
> > > > >
> > > > > No. I mean the source may not support generating watermarks, so the
> > > > > watermark
> > > > > expression is applied in a following operator (instead of in the
> > source
> > > > > operator).
> > > > > This will result in the watermark lag doesn't work in this case and
> > > > confuse
> > > > > users.
> > > > >
> > > > > > You are right that this is a limitation. However, this is only a
> > > > > short-term
> > > > > > limitation which we added to make sure that we can focus on the
> > > > > capability
> > > > > > to switch from backlog=true to backlog=false. In the future, we
> > will
> > > > > remove
> > > > > > this limitation and also support switching from backlog=false to
> > > > > > backlog=true.
> > > > >
> > > > > I can understand it may be difficult to support runtime mode
> > switching
> > > > back
> > > > > and forth.
> > > > > However, I think this should be a limitation of FLIP-327, not
> > FLIP-328.
> > > > > IIUC,
> > > > > FLIP-309 doesn't have this limitation, right? I just don't
> understand
> > > > > what's the
> > > > > challenge to switch a flag?
> > > > >
> > > > > Best,
> > > > > Jark
> > > > >
> > > > >
> > > > > On Sun, 10 Sept 2023 at 19:44, Dong Lin <lindon...@gmail.com>
> wrote:
> > > > >
> > > > > > Hi Jark,
> > > > > >
> > > > > > Thanks for the comments. Please see my comments inline.
> > > > > >
> > > > > > On Sat, Sep 9, 2023 at 4:13 PM Jark Wu <imj...@gmail.com> wrote:
> > > > > >
> > > > > > > Hi Xuannan,
> > > > > > >
> > > > > > > I leave my comments inline.
> > > > > > >
> > > > > > > > In the case where a user wants to
> > > > > > > > use a CDC source and also determine backlog status based on
> > > > watermark
> > > > > > > > lag, we still need to define the rule when that occurs
> > > > > > >
> > > > > > > This rule should be defined by the source itself (who knows
> > backlog
> > > > > > best),
> > > > > > > not by the framework. In the case of CDC source, it reports
> > > > > > isBacklog=true
> > > > > > > during snapshot stage, and report isBacklog=false during
> > changelog
> > > > > stage
> > > > > > if
> > > > > > > watermark-lag is within the threshold.
> > > > > > >
> > > > > >
> > > > > > I am not sure I fully understand the difference between adding a
> > > > > job-level
> > > > > > config vs. adding a per-source config.
> > > > > >
> > > > > > In the case of CDC, its watermark lag should be either
> unde-defined
> > > or
> > > > > > really large in the snapshot stage. As a result, the proposed
> > > job-level
> > > > > > config will be applied only in the changelog stage. So there is
> no
> > > > > > difference between these two approaches in this particular case,
> > > right?
> > > > > >
> > > > > > There are two advantages of the job-level config over per-source
> > > > config:
> > > > > >
> > > > > > 1) Configuration is simpler. For example, suppose a user has a
> > Flink
> > > > job
> > > > > > that consumes records from multiple Kafka sources and wants to
> > > > determine
> > > > > > backlog status for these Kafka sources using the same watermark
> lag
> > > > > > threshold, there is no need for users to repeatedly specify this
> > > > > threshold
> > > > > > for each source.
> > > > > >
> > > > > > 2) There is a smaller number of public APIs overall. In
> particular,
> > > > > instead
> > > > > > of repeatedly adding a
> setProcessingBacklogWatermarkLagThreshold()
> > > API
> > > > > for
> > > > > > every source operator that has even-time watermark lag defined,
> we
> > > only
> > > > > > need to add one job-level config. Less public API means better
> > > > simplicity
> > > > > > and maintainability in general.
> > > > > >
> > > > > > On the other hand, per-source config will be necessary if users
> > want
> > > to
> > > > > > apply different watermark lag thresholds for different sources in
> > the
> > > > > same
> > > > > > job. Personally, I find this a bit counter-intuitive for users to
> > > > specify
> > > > > > different watermark lag thresholds in the same job.
> > > > > >
> > > > > > Do you think there is any real-word use-case that requires this?
> > > Could
> > > > > you
> > > > > > provide a specific use-case where per-source config can provide
> an
> > > > > > advantage over the job-level config?
> > > > > >
> > > > > >
> > > > > > > I think it's not intuitive to combine it with the logical OR
> > > > operation.
> > > > > > > Even for the
> > > > > > > combination logic of backlog status from different channels,
> > > FLIP-309
> > > > > > said
> > > > > > > it is
> > > > > > > "up to the operator to determine its output records' isBacklog
> > > value"
> > > > > and
> > > > > > > proposed
> > > > > > > 3 different strategies. Therefore, I think backlog status from
> a
> > > > single
> > > > > > > source should
> > > > > > > be up to the source.
> > > > > >
> > > > > >
> > > > > > For both the job-level config and the per-source config, it is
> > > > eventually
> > > > > > up to the user to decide the computation logic of the backlog
> > status.
> > > > > > Whether this mechanism is implemented at the per-source level or
> > > > > framework
> > > > > > level is probably more like an implementation detail.
> > > > > >
> > > > > > Eventually, I think the choice between these two approaches
> depends
> > > on
> > > > > > whether we have any use-case for users to specify different
> > watermark
> > > > lag
> > > > > > thresholds in the same job.
> > > > > >
> > > > > >
> > > > > > >
> > > > > > > IMO, a better API design is not how to resolve conflicts but
> not
> > > > > > > introducing conflicts.
> > > > > >
> > > > > >
> > > > > > Just to clarify, the current FLIP does not introduce any
> conflict.
> > > Each
> > > > > > source can have its own rule that specifies when the backlog can
> be
> > > > true
> > > > > > (e.g. MySql CDC says the backlog should be true during the
> snapshot
> > > > > stage).
> > > > > > And we can have a job-level config that specifies when the
> backlog
> > > > should
> > > > > > be true. Note that it is designed in such a way that none of
> these
> > > > rules
> > > > > > specify when the backlog should be false. That is why there is no
> > > > > conflict
> > > > > > by definition.
> > > > > >
> > > > > >
> > > > > >
> > > > > > Let the source determine backlog status removes the conflicts
> and I
> > > > don't
> > > > > > > see big
> > > > > > > disadvantages.
> > > > > > >
> > > > > > > > It should not confuse the user that
> > > > > > > > DataStream#assignTimestampsAndWatermarks doesn't work with
> > > > > > > > backlog.watermark-lag-threshold, as it is not a source.
> > > > > > >
> > > > > > > Hmm, so this configuration may confuse Flink SQL users, because
> > all
> > > > > > > watermarks
> > > > > > > are defined on the source DDL, but it may use a separate
> operator
> > > to
> > > > > emit
> > > > > > > watermarks
> > > > > > > if the source doesn't support emitting watermarks.
> > > > > > >
> > > > > >
> > > > > > If I understand your comments correctly, you mean that we might
> > have
> > > a
> > > > > > Flink SQL DDL with user-defined watermark expressions. And users
> > also
> > > > > want
> > > > > > to set the backlog to true if the watermark generated by that
> > > > > > user-specified expression exceeds a threshold.
> > > > > >
> > > > > > That is a good point and use-case. I agree we should also cover
> > this
> > > > > > scenario. And we can update FLIP-328 to mention that the
> job-level
> > > > config
> > > > > > will also be applicable when the watermark derived from the Flink
> > SQL
> > > > DDL
> > > > > > exceeds this threshold. Would this address your concern?
> > > > > >
> > > > > >
> > > > > > >
> > > > > > > > I think the description in the FLIP actually means the other
> > way
> > > > > > > > around, where the job can never switch back to batch mode
> once
> > it
> > > > has
> > > > > > > > switched into streaming mode. This is to align with the
> current
> > > > state
> > > > > > > > of FLIP-327[1], where only switching from batch to stream
> mode
> > is
> > > > > > > > supported.
> > > > > > >
> > > > > > > This sounds like a limitation of FLIP-327 (that execution mode
> > > > depends
> > > > > on
> > > > > > > backlog status).
> > > > > > > But the backlog status shouldn't have this limitation, because
> it
> > > is
> > > > > not
> > > > > > > only used for execution
> > > > > > > switching.
> > > > > > >
> > > > > >
> > > > > > You are right that this is a limitation. However, this is only a
> > > > > short-term
> > > > > > limitation which we added to make sure that we can focus on the
> > > > > capability
> > > > > > to switch from backlog=true to backlog=false. In the future, we
> > will
> > > > > remove
> > > > > > this limitation and also support switching from backlog=false to
> > > > > > backlog=true.
> > > > > >
> > > > > > The capability to switch from backlog=true to backlog=false will
> > > > > mitigate a
> > > > > > lot of problems we are facing now. As it is common for users to
> > > start a
> > > > > > Flink job to process backlog data followed by real-time data. On
> > the
> > > > > other
> > > > > > hand, switching from backlog=false to backlog=true is useful when
> > > there
> > > > > is
> > > > > > a traffic spike while the Flink job is processing real-time data,
> > > which
> > > > > is
> > > > > > also useful to address but less important than the previous one.
> > > > > >
> > > > > > Given that both features require considerable changes to the
> > > underlying
> > > > > > runtime, we think it might be useful and safe to tackle them one
> by
> > > > one.
> > > > > >
> > > > > > Thanks again for the comments. Please let us know what you think.
> > > > > >
> > > > > > Best,
> > > > > > Dong
> > > > > >
> > > > > >
> > > > > > >
> > > > > > > Best,
> > > > > > > Jark
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > On Fri, 8 Sept 2023 at 19:09, Xuannan Su <
> suxuanna...@gmail.com>
> > > > > wrote:
> > > > > > >
> > > > > > > > Hi Jark and Leonard,
> > > > > > > >
> > > > > > > > Thanks for the comments. Please see my reply below.
> > > > > > > >
> > > > > > > > @Jark
> > > > > > > >
> > > > > > > > > I think a better API doesn't compete with itself.
> Therefore,
> > > I'm
> > > > in
> > > > > > > > favor of
> > > > > > > > > supporting the watermark lag threshold for each source
> > without
> > > > > > > > introducing
> > > > > > > > > any framework API and configuration.
> > > > > > > >
> > > > > > > > I don't think supporting the watermark lag threshold for each
> > > > source
> > > > > > > > can avoid the competition problem. In the case where a user
> > wants
> > > > to
> > > > > > > > use a CDC source and also determine backlog status based on
> > > > watermark
> > > > > > > > lag, we still need to define the rule when that occurs. With
> > that
> > > > > > > > said, I think it is more intuitive to combine it with the
> > logical
> > > > OR
> > > > > > > > operation, as the strategies (FLIP-309, FLIP-328) only
> > determine
> > > > when
> > > > > > > > the source's backlog status should be True. What do you
> think?
> > > > > > > >
> > > > > > > > > Besides, this can address another concern that the
> watermark
> > > may
> > > > be
> > > > > > > > > generated by DataStream#assignTimestampsAnd
> > > > > > > > > Watermarks which doesn't
> > > > > > > > > work with the backlog.watermark-lag-threshold job config
> > > > > > > >
> > > > > > > > The description of the configuration explicitly states that
> "a
> > > > source
> > > > > > > > would report isProcessingBacklog=true if its watermark lag
> > > exceeds
> > > > > the
> > > > > > > > configured value". It should not confuse the user that
> > > > > > > > DataStream#assignTimestampsAndWatermarks doesn't work with
> > > > > > > > backlog.watermark-lag-threshold, as it is not a source.
> > > > > > > >
> > > > > > > > > Does that mean the job can never back to streaming mode
> once
> > > > > switches
> > > > > > > > into
> > > > > > > > > backlog mode? It sounds like not a complete FLIP to me. Is
> it
> > > > > > possible
> > > > > > > to
> > > > > > > > > support switching back in this FLIP?
> > > > > > > >
> > > > > > > > I think the description in the FLIP actually means the other
> > way
> > > > > > > > around, where the job can never switch back to batch mode
> once
> > it
> > > > has
> > > > > > > > switched into streaming mode. This is to align with the
> current
> > > > state
> > > > > > > > of FLIP-327[1], where only switching from batch to stream
> mode
> > is
> > > > > > > > supported.
> > > > > > > >
> > > > > > > > @Leonard
> > > > > > > >
> > > > > > > > > > The FLIP describe that: And it should report
> > > > > > > isProcessingBacklog=false
> > > > > > > > at the beginning of the snapshot stage.
> > > > > > > > > This should be “changelog stage”
> > > > > > > >
> > > > > > > > I think the description is in FLIP-309. Thanks for pointing
> > out.
> > > I
> > > > > > > > updated the description.
> > > > > > > >
> > > > > > > > > I'm not sure if it's enough to support this feature only in
> > > > FLIP-27
> > > > > > > > Source. Although we are pushing the sourceFunction API to be
> > > > removed,
> > > > > > > these
> > > > > > > > APIs will be survive one or two versions in flink repo before
> > > they
> > > > > are
> > > > > > > > actually removed.
> > > > > > > >
> > > > > > > > I agree that it is good to support the SourceFunction API.
> > > However,
> > > > > > > > given that the SourceFunction API is marked as deprecated, I
> > > think
> > > > I
> > > > > > > > will prioritize supporting the FLIP-27 Source. We can support
> > the
> > > > > > > > SourceFunction API after the
> > > > > > > > FLIP-27 source. What do you think?
> > > > > > > >
> > > > > > > > Best regards,
> > > > > > > > Xuannan
> > > > > > > >
> > > > > > > > [1]
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-327%3A+Support+switching+from+batch+to+stream+mode+to+improve+throughput+when+processing+backlog+data
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > On Fri, Sep 8, 2023 at 1:02 AM Leonard Xu <xbjt...@gmail.com
> >
> > > > wrote:
> > > > > > > > >
> > > > > > > > > Thanks Xuannan for driving this FLIP !
> > > > > > > > >
> > > > > > > > > The proposal generally looks good to me, but I still left
> > some
> > > > > > > comments:
> > > > > > > > >
> > > > > > > > > > One more question about the FLIP is that the FLIP says
> > "Note
> > > > that
> > > > > > > this
> > > > > > > > > > config does not support switching source's
> > > isProcessingBacklog
> > > > > from
> > > > > > > > false to true
> > > > > > > > > > for now.” Does that mean the job can never back to
> > streaming
> > > > mode
> > > > > > > once
> > > > > > > > switches into
> > > > > > > > > > backlog mode? It sounds like not a complete FLIP to me.
> Is
> > it
> > > > > > > possible
> > > > > > > > to
> > > > > > > > > > support switching back in this FLIP?
> > > > > > > > > +1 for Jark’s concern, IIUC, the state transition of
> > > > > > > IsProcessingBacklog
> > > > > > > > depends on whether the data in the source is processing
> backlog
> > > > data
> > > > > or
> > > > > > > > not. Different sources will have different backlog status and
> > > which
> > > > > may
> > > > > > > > change over time. From a general perspective, we should not
> > have
> > > > this
> > > > > > > > restriction.
> > > > > > > > >
> > > > > > > > > > The FLIP describe that: And it should report
> > > > > > > isProcessingBacklog=false
> > > > > > > > at the beginning of the snapshot stage.
> > > > > > > > > This should be “changelog stage”
> > > > > > > > >
> > > > > > > > > I'm not sure if it's enough to support this feature only in
> > > > FLIP-27
> > > > > > > > Source. Although we are pushing the sourceFunction API to be
> > > > removed,
> > > > > > > these
> > > > > > > > APIs will be survive one or two versions in flink repo before
> > > they
> > > > > are
> > > > > > > > actually removed.
> > > > > > > > >
> > > > > > > > > Best,
> > > > > > > > > Leonard
> > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > Best,
> > > > > > > > > > Jark
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > On Thu, 7 Sept 2023 at 13:51, Xuannan Su <
> > > > suxuanna...@gmail.com>
> > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > >> Hi all,
> > > > > > > > > >>
> > > > > > > > > >> Thank you for all the reviews and suggestions.
> > > > > > > > > >>
> > > > > > > > > >> I believe all the comments have been addressed. If there
> > are
> > > > no
> > > > > > > > > >> further comments, I plan to open the voting thread for
> > this
> > > > FLIP
> > > > > > > early
> > > > > > > > > >> next week.
> > > > > > > > > >>
> > > > > > > > > >> Best regards,
> > > > > > > > > >> Xuannan
> > > > > > > > > >>
> > > > > > > > > >> On Thu, Sep 7, 2023 at 12:09 AM Jing Ge
> > > > > > <j...@ververica.com.invalid
> > > > > > > >
> > > > > > > > > >> wrote:
> > > > > > > > > >>>
> > > > > > > > > >>> Hi Xuannan,
> > > > > > > > > >>>
> > > > > > > > > >>> I thought FLIP-328 will compete with FLIP-309 while
> > setting
> > > > the
> > > > > > > > value of
> > > > > > > > > >>> the backlog. Understood. Thanks for the hint.
> > > > > > > > > >>>
> > > > > > > > > >>> Best regards,
> > > > > > > > > >>> Jing
> > > > > > > > > >>>
> > > > > > > > > >>> On Wed, Sep 6, 2023 at 12:12 PM Xuannan Su <
> > > > > > suxuanna...@gmail.com>
> > > > > > > > > >> wrote:
> > > > > > > > > >>>
> > > > > > > > > >>>> Hi Jing,
> > > > > > > > > >>>>
> > > > > > > > > >>>> Thank you for the clarification.
> > > > > > > > > >>>>
> > > > > > > > > >>>> For the use case you mentioned, I believe we can
> utilize
> > > the
> > > > > > > > > >>>> HybridSource, as updated in FLIP-309[1], to determine
> > the
> > > > > > backlog
> > > > > > > > > >>>> status. For example, if the user wants to process data
> > > > before
> > > > > > > time T
> > > > > > > > > >>>> in batch mode and after time T in stream mode, they
> can
> > > set
> > > > > the
> > > > > > > > first
> > > > > > > > > >>>> source of the HybridSource to read up to time T and
> the
> > > last
> > > > > > > source
> > > > > > > > of
> > > > > > > > > >>>> the HybridSource to read from time T.
> > > > > > > > > >>>>
> > > > > > > > > >>>> Best,
> > > > > > > > > >>>> Xuannan
> > > > > > > > > >>>>
> > > > > > > > > >>>> [1]
> > > > > > > > > >>>>
> > > > > > > > > >>
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-309%3A+Support+using+larger+checkpointing+interval+when+source+is+processing+backlog
> > > > > > > > > >>>>
> > > > > > > > > >>>>
> > > > > > > > > >>>> On Mon, Sep 4, 2023 at 10:36 PM Jing Ge
> > > > > > > <j...@ververica.com.invalid
> > > > > > > > >
> > > > > > > > > >>>> wrote:
> > > > > > > > > >>>>>
> > > > > > > > > >>>>> Hi Xuannan,
> > > > > > > > > >>>>>
> > > > > > > > > >>>>> Thanks for the clarification.
> > > > > > > > > >>>>>
> > > > > > > > > >>>>> 3. Event time and process time are two different
> > things.
> > > It
> > > > > > might
> > > > > > > > be
> > > > > > > > > >>>> rarely
> > > > > > > > > >>>>> used, but conceptually, users can process data in the
> > > past
> > > > > > > within a
> > > > > > > > > >>>>> specific time range in the streaming mode. All data
> > > before
> > > > > that
> > > > > > > > range
> > > > > > > > > >>>> will
> > > > > > > > > >>>>> be considered as backlog and needed to be processed
> in
> > > the
> > > > > > batch
> > > > > > > > > >> mode,
> > > > > > > > > >>>>> like, e.g. the Present Perfect Progressive tense used
> > in
> > > > > > English
> > > > > > > > > >>>> language.
> > > > > > > > > >>>>>
> > > > > > > > > >>>>> Best regards,
> > > > > > > > > >>>>> Jing
> > > > > > > > > >>>>>
> > > > > > > > > >>>>> On Thu, Aug 31, 2023 at 4:45 AM Xuannan Su <
> > > > > > > suxuanna...@gmail.com>
> > > > > > > > > >>>> wrote:
> > > > > > > > > >>>>>
> > > > > > > > > >>>>>> Hi Jing,
> > > > > > > > > >>>>>>
> > > > > > > > > >>>>>> Thanks for the reply.
> > > > > > > > > >>>>>>
> > > > > > > > > >>>>>> 1. You are absolutely right that the watermark lag
> > > > threshold
> > > > > > > must
> > > > > > > > > >> be
> > > > > > > > > >>>>>> carefully set with a thorough understanding of
> > watermark
> > > > > > > > > >> generation.
> > > > > > > > > >>>> It is
> > > > > > > > > >>>>>> crucial for users to take into account the
> > > > WatermarkStrategy
> > > > > > > when
> > > > > > > > > >>>> setting
> > > > > > > > > >>>>>> the watermark lag threshold.
> > > > > > > > > >>>>>>
> > > > > > > > > >>>>>> 2. Regarding pure processing-time based stream
> > > processing
> > > > > > jobs,
> > > > > > > > > >>>>>> alternative strategies will be implemented to
> > determine
> > > > > > whether
> > > > > > > > the
> > > > > > > > > >>>> job is
> > > > > > > > > >>>>>> processing backlog data. I have outlined two
> possible
> > > > > > strategies
> > > > > > > > > >> below:
> > > > > > > > > >>>>>>
> > > > > > > > > >>>>>> - Based on the source operator's state. For example,
> > > when
> > > > > > MySQL
> > > > > > > > CDC
> > > > > > > > > >>>> source
> > > > > > > > > >>>>>> is reading snapshot, it can claim isBacklog=true.
> > > > > > > > > >>>>>> - Based on metrics. For example, when
> > > busyTimeMsPerSecond
> > > > > (or
> > > > > > > > > >>>>>> backPressuredTimeMsPerSecond) >
> > > user_specified_threshold,
> > > > > then
> > > > > > > > > >>>>>> isBacklog=true.
> > > > > > > > > >>>>>>
> > > > > > > > > >>>>>> As of the strategies proposed in this FLIP, it rely
> on
> > > > > > generated
> > > > > > > > > >>>>>> watermarks. Therefore, if a user intends for the job
> > to
> > > > > detect
> > > > > > > > > >> backlog
> > > > > > > > > >>>>>> status based on watermark, it is necessary to
> generate
> > > the
> > > > > > > > > >> watermark.
> > > > > > > > > >>>>>>
> > > > > > > > > >>>>>> 3. I'm afraid I'm not fully grasping your question.
> > From
> > > > my
> > > > > > > > > >>>> understanding,
> > > > > > > > > >>>>>> it should work in both cases. When event times are
> > close
> > > > to
> > > > > > the
> > > > > > > > > >>>> processing
> > > > > > > > > >>>>>> time, resulting in watermarks close to the
> processing
> > > > time,
> > > > > > the
> > > > > > > > > >> job is
> > > > > > > > > >>>> not
> > > > > > > > > >>>>>> processing backlog data. On the other hand, when
> event
> > > > times
> > > > > > are
> > > > > > > > > >> far
> > > > > > > > > >>>> from
> > > > > > > > > >>>>>> processing time, causing watermarks to also be
> > distant,
> > > if
> > > > > the
> > > > > > > lag
> > > > > > > > > >>>>>> surpasses the defined threshold, the job is
> considered
> > > > > > > processing
> > > > > > > > > >>>> backlog
> > > > > > > > > >>>>>> data.
> > > > > > > > > >>>>>>
> > > > > > > > > >>>>>> Best,
> > > > > > > > > >>>>>> Xuannan
> > > > > > > > > >>>>>>
> > > > > > > > > >>>>>>
> > > > > > > > > >>>>>>> On Aug 31, 2023, at 02:56, Jing Ge
> > > > > > <j...@ververica.com.INVALID
> > > > > > > >
> > > > > > > > > >>>> wrote:
> > > > > > > > > >>>>>>>
> > > > > > > > > >>>>>>> Hi Xuannan,
> > > > > > > > > >>>>>>>
> > > > > > > > > >>>>>>> Thanks for the clarification. That is the part
> where
> > I
> > > am
> > > > > > > trying
> > > > > > > > > >> to
> > > > > > > > > >>>>>>> understand your thoughts. I have some follow-up
> > > > questions:
> > > > > > > > > >>>>>>>
> > > > > > > > > >>>>>>> 1. It depends strongly on the watermarkStrategy and
> > how
> > > > > > > > > >> customized
> > > > > > > > > >>>>>>> watermark generation looks like. It mixes business
> > > logic
> > > > > with
> > > > > > > > > >>>> technical
> > > > > > > > > >>>>>>> implementation and technical data processing mode.
> > The
> > > > > value
> > > > > > of
> > > > > > > > > >> the
> > > > > > > > > >>>>>>> watermark lag threshold must be set very carefully.
> > If
> > > > the
> > > > > > > value
> > > > > > > > > >> is
> > > > > > > > > >>>> too
> > > > > > > > > >>>>>>> small. any time, when the watermark generation
> logic
> > is
> > > > > > > > > >>>> changed(business
> > > > > > > > > >>>>>>> logic changes lead to the threshold getting
> > exceeded),
> > > > the
> > > > > > same
> > > > > > > > > >> job
> > > > > > > > > >>>> might
> > > > > > > > > >>>>>>> be running surprisingly in backlog processing mode,
> > > i.e.
> > > > a
> > > > > > > > > >> butterfly
> > > > > > > > > >>>>>>> effect. A comprehensive documentation is required
> to
> > > > avoid
> > > > > > any
> > > > > > > > > >>>> confusion
> > > > > > > > > >>>>>>> for the users.
> > > > > > > > > >>>>>>> 2. Like Jark already mentioned, use cases that do
> not
> > > > have
> > > > > > > > > >>>> watermarks,
> > > > > > > > > >>>>>>> like pure processing-time based stream
> processing[1]
> > > are
> > > > > not
> > > > > > > > > >>>> covered. It
> > > > > > > > > >>>>>> is
> > > > > > > > > >>>>>>> more or less a trade-off solution that does not
> > support
> > > > > such
> > > > > > > use
> > > > > > > > > >>>> cases
> > > > > > > > > >>>>>> and
> > > > > > > > > >>>>>>> appropriate documentation is required. Forcing them
> > to
> > > > > > > explicitly
> > > > > > > > > >>>>>> generate
> > > > > > > > > >>>>>>> watermarks that are never needed just because of
> this
> > > > does
> > > > > > not
> > > > > > > > > >> sound
> > > > > > > > > >>>>>> like a
> > > > > > > > > >>>>>>> proper solution.
> > > > > > > > > >>>>>>> 3. If I am not mistaken, it only works for use
> cases
> > > > where
> > > > > > > event
> > > > > > > > > >>>> times
> > > > > > > > > >>>>>> are
> > > > > > > > > >>>>>>> very close to the processing times, because the
> wall
> > > > clock
> > > > > is
> > > > > > > > > >> used to
> > > > > > > > > >>>>>>> calculate the watermark lag and the watermark is
> > > > generated
> > > > > > > based
> > > > > > > > > >> on
> > > > > > > > > >>>> the
> > > > > > > > > >>>>>>> event time.
> > > > > > > > > >>>>>>>
> > > > > > > > > >>>>>>> Best regards,
> > > > > > > > > >>>>>>> Jing
> > > > > > > > > >>>>>>>
> > > > > > > > > >>>>>>> [1]
> > > > > > > > > >>>>>>>
> > > > > > > > > >>>>>>
> > > > > > > > > >>>>
> > > > > > > > > >>
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/apache/flink/blob/2c50b4e956305426f478b726d4de4a640a16b810/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkStrategy.java#L236
> > > > > > > > > >>>>>>>
> > > > > > > > > >>>>>>> On Wed, Aug 30, 2023 at 4:06 AM Xuannan Su <
> > > > > > > > > >> suxuanna...@gmail.com>
> > > > > > > > > >>>>>> wrote:
> > > > > > > > > >>>>>>>
> > > > > > > > > >>>>>>>> Hi Jing,
> > > > > > > > > >>>>>>>>
> > > > > > > > > >>>>>>>> Thank you for the suggestion.
> > > > > > > > > >>>>>>>>
> > > > > > > > > >>>>>>>> The definition of watermark lag is the same as the
> > > > > > > watermarkLag
> > > > > > > > > >>>> metric
> > > > > > > > > >>>>>> in
> > > > > > > > > >>>>>>>> FLIP-33[1]. More specifically, the watermark lag
> > > > > calculation
> > > > > > > is
> > > > > > > > > >>>>>> computed at
> > > > > > > > > >>>>>>>> the time when a watermark is emitted downstream in
> > the
> > > > > > > following
> > > > > > > > > >>>> way:
> > > > > > > > > >>>>>>>> watermarkLag = CurrentTime - Watermark. I have
> added
> > > > this
> > > > > > > > > >>>> description to
> > > > > > > > > >>>>>>>> the FLIP.
> > > > > > > > > >>>>>>>>
> > > > > > > > > >>>>>>>> I hope this addresses your concern.
> > > > > > > > > >>>>>>>>
> > > > > > > > > >>>>>>>> Best,
> > > > > > > > > >>>>>>>> Xuannan
> > > > > > > > > >>>>>>>>
> > > > > > > > > >>>>>>>> [1]
> > > > > > > > > >>>>>>>>
> > > > > > > > > >>>>>>
> > > > > > > > > >>>>
> > > > > > > > > >>
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics
> > > > > > > > > >>>>>>>>
> > > > > > > > > >>>>>>>>
> > > > > > > > > >>>>>>>>> On Aug 28, 2023, at 01:04, Jing Ge
> > > > > > > <j...@ververica.com.INVALID
> > > > > > > > > >>>
> > > > > > > > > >>>> wrote:
> > > > > > > > > >>>>>>>>>
> > > > > > > > > >>>>>>>>> Hi Xuannan,
> > > > > > > > > >>>>>>>>>
> > > > > > > > > >>>>>>>>> Thanks for the proposal. +1 for me.
> > > > > > > > > >>>>>>>>>
> > > > > > > > > >>>>>>>>> There is one tiny thing that I am not sure if I
> > > > > understand
> > > > > > it
> > > > > > > > > >>>>>> correctly.
> > > > > > > > > >>>>>>>>> Since there will be many different
> > > WatermarkStrategies
> > > > > and
> > > > > > > > > >>>> different
> > > > > > > > > >>>>>>>>> WatermarkGenerators. Could you please update the
> > FLIP
> > > > and
> > > > > > add
> > > > > > > > > >> the
> > > > > > > > > >>>>>>>>> description of how the watermark lag is
> calculated
> > > > > exactly?
> > > > > > > > > >> E.g.
> > > > > > > > > >>>>>>>> Watermark
> > > > > > > > > >>>>>>>>> lag = A - B with A is the timestamp of the
> > watermark
> > > > > > emitted
> > > > > > > > > >> to the
> > > > > > > > > >>>>>>>>> downstream and B is....(this is the part I am not
> > > > really
> > > > > > sure
> > > > > > > > > >> after
> > > > > > > > > >>>>>>>> reading
> > > > > > > > > >>>>>>>>> the FLIP).
> > > > > > > > > >>>>>>>>>
> > > > > > > > > >>>>>>>>> Best regards,
> > > > > > > > > >>>>>>>>> Jing
> > > > > > > > > >>>>>>>>>
> > > > > > > > > >>>>>>>>>
> > > > > > > > > >>>>>>>>> On Mon, Aug 21, 2023 at 9:03 AM Xuannan Su <
> > > > > > > > > >> suxuanna...@gmail.com>
> > > > > > > > > >>>>>>>> wrote:
> > > > > > > > > >>>>>>>>>
> > > > > > > > > >>>>>>>>>> Hi Jark,
> > > > > > > > > >>>>>>>>>>
> > > > > > > > > >>>>>>>>>> Thanks for the comments.
> > > > > > > > > >>>>>>>>>>
> > > > > > > > > >>>>>>>>>> I agree that the current solution cannot support
> > > jobs
> > > > > that
> > > > > > > > > >> cannot
> > > > > > > > > >>>>>> define
> > > > > > > > > >>>>>>>>>> watermarks. However, after considering the
> > > > > > > > > >> pending-record-based
> > > > > > > > > >>>>>>>> solution, I
> > > > > > > > > >>>>>>>>>> believe the current solution is superior for the
> > > > target
> > > > > > use
> > > > > > > > > >> case
> > > > > > > > > >>>> as it
> > > > > > > > > >>>>>>>> is
> > > > > > > > > >>>>>>>>>> more intuitive for users. The backlog status
> gives
> > > > users
> > > > > > the
> > > > > > > > > >>>> ability
> > > > > > > > > >>>>>> to
> > > > > > > > > >>>>>>>>>> balance between throughput and latency. Making
> > this
> > > > > > > trade-off
> > > > > > > > > >>>> decision
> > > > > > > > > >>>>>>>>>> based on the watermark lag is more intuitive
> from
> > > the
> > > > > > user's
> > > > > > > > > >>>>>>>> perspective.
> > > > > > > > > >>>>>>>>>> For instance, a user can decide that if the job
> > lags
> > > > > > behind
> > > > > > > > > >> the
> > > > > > > > > >>>>>> current
> > > > > > > > > >>>>>>>>>> time by more than 1 hour, the result is not
> > usable.
> > > In
> > > > > > that
> > > > > > > > > >> case,
> > > > > > > > > >>>> we
> > > > > > > > > >>>>>> can
> > > > > > > > > >>>>>>>>>> optimize for throughput when the data lags
> behind
> > by
> > > > > more
> > > > > > > > > >> than an
> > > > > > > > > >>>>>> hour.
> > > > > > > > > >>>>>>>>>> With the pending-record-based solution, it's
> > > > challenging
> > > > > > for
> > > > > > > > > >>>> users to
> > > > > > > > > >>>>>>>>>> determine when to optimize for throughput and
> when
> > > to
> > > > > > > > > >> prioritize
> > > > > > > > > >>>>>>>> latency.
> > > > > > > > > >>>>>>>>>>
> > > > > > > > > >>>>>>>>>> Regarding the limitations of the watermark-based
> > > > > solution:
> > > > > > > > > >>>>>>>>>>
> > > > > > > > > >>>>>>>>>> 1. The current solution can support jobs with
> > > sources
> > > > > that
> > > > > > > > > >> have
> > > > > > > > > >>>> event
> > > > > > > > > >>>>>>>>>> time. Users can always define a watermark at the
> > > > source
> > > > > > > > > >> operator,
> > > > > > > > > >>>> even
> > > > > > > > > >>>>>>>> if
> > > > > > > > > >>>>>>>>>> it's not used by downstream operators, such as
> > > > streaming
> > > > > > > join
> > > > > > > > > >> and
> > > > > > > > > >>>>>>>> unbounded
> > > > > > > > > >>>>>>>>>> aggregate.
> > > > > > > > > >>>>>>>>>>
> > > > > > > > > >>>>>>>>>> 2.I don't believe it's accurate to say that the
> > > > > watermark
> > > > > > > lag
> > > > > > > > > >> will
> > > > > > > > > >>>>>> keep
> > > > > > > > > >>>>>>>>>> increasing if no data is generated in Kafka. The
> > > > > watermark
> > > > > > > > > >> lag and
> > > > > > > > > >>>>>>>> backlog
> > > > > > > > > >>>>>>>>>> status are determined at the moment when the
> > > watermark
> > > > > is
> > > > > > > > > >> emitted
> > > > > > > > > >>>> to
> > > > > > > > > >>>>>> the
> > > > > > > > > >>>>>>>>>> downstream operator. If no data is emitted from
> > the
> > > > > > source,
> > > > > > > > > >> the
> > > > > > > > > >>>>>>>> watermark
> > > > > > > > > >>>>>>>>>> lag and backlog status will not be updated. If
> the
> > > > > > > > > >>>> WatermarkStrategy
> > > > > > > > > >>>>>>>> with
> > > > > > > > > >>>>>>>>>> idleness is used, the source becomes non-backlog
> > > when
> > > > it
> > > > > > > > > >> becomes
> > > > > > > > > >>>> idle.
> > > > > > > > > >>>>>>>>>>
> > > > > > > > > >>>>>>>>>> 3. I think watermark lag is more intuitive to
> > > > determine
> > > > > > if a
> > > > > > > > > >> job
> > > > > > > > > >>>> is
> > > > > > > > > >>>>>>>>>> processing backlog data. Even when using pending
> > > > > records,
> > > > > > it
> > > > > > > > > >>>> faces a
> > > > > > > > > >>>>>>>>>> similar issue. For example, if the source has 1K
> > > > pending
> > > > > > > > > >> records,
> > > > > > > > > >>>>>> those
> > > > > > > > > >>>>>>>>>> records can span from 1 day  to 1 hour to 1
> > second.
> > > If
> > > > > the
> > > > > > > > > >> records
> > > > > > > > > >>>>>> span
> > > > > > > > > >>>>>>>> 1
> > > > > > > > > >>>>>>>>>> day, it's probably best to optimize for
> > throughput.
> > > If
> > > > > > they
> > > > > > > > > >> span 1
> > > > > > > > > >>>>>>>> hour, it
> > > > > > > > > >>>>>>>>>> depends on the business logic. If they span 1
> > > second,
> > > > > > > > > >> optimizing
> > > > > > > > > >>>> for
> > > > > > > > > >>>>>>>>>> latency is likely the better choice.
> > > > > > > > > >>>>>>>>>>
> > > > > > > > > >>>>>>>>>> In summary, I believe the watermark-based
> solution
> > > is
> > > > a
> > > > > > > > > >> superior
> > > > > > > > > >>>>>> choice
> > > > > > > > > >>>>>>>>>> for the target use case where watermark/event
> time
> > > can
> > > > > be
> > > > > > > > > >> defined.
> > > > > > > > > >>>>>>>>>> Additionally, I haven't come across a scenario
> > that
> > > > > > requires
> > > > > > > > > >>>>>> low-latency
> > > > > > > > > >>>>>>>>>> processing and reads from a source that cannot
> > > define
> > > > > > > > > >> watermarks.
> > > > > > > > > >>>> If
> > > > > > > > > >>>>>> we
> > > > > > > > > >>>>>>>>>> encounter such a use case, we can create another
> > > FLIP
> > > > to
> > > > > > > > > >> address
> > > > > > > > > >>>> those
> > > > > > > > > >>>>>>>>>> needs in the future. What do you think?
> > > > > > > > > >>>>>>>>>>
> > > > > > > > > >>>>>>>>>>
> > > > > > > > > >>>>>>>>>> Best,
> > > > > > > > > >>>>>>>>>> Xuannan
> > > > > > > > > >>>>>>>>>>
> > > > > > > > > >>>>>>>>>>
> > > > > > > > > >>>>>>>>>>
> > > > > > > > > >>>>>>>>>>> On Aug 20, 2023, at 23:27, Jark Wu <
> > > imj...@gmail.com
> > > > > > > > > >> <mailto:
> > > > > > > > > >>>>>>>>>> imj...@gmail.com>> wrote:
> > > > > > > > > >>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>> Hi Xuannan,
> > > > > > > > > >>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>> Thanks for opening this discussion.
> > > > > > > > > >>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>> This current proposal may work in the mentioned
> > > > > watermark
> > > > > > > > > >> cases.
> > > > > > > > > >>>>>>>>>>> However, it seems this is not a general
> solution
> > > for
> > > > > > > sources
> > > > > > > > > >> to
> > > > > > > > > >>>>>>>> determine
> > > > > > > > > >>>>>>>>>>> "isProcessingBacklog".
> > > > > > > > > >>>>>>>>>>> From my point of view, there are 3 limitations
> of
> > > the
> > > > > > > current
> > > > > > > > > >>>>>> proposal:
> > > > > > > > > >>>>>>>>>>> 1. It doesn't cover jobs that don't have
> > > > > > > watermark/event-time
> > > > > > > > > >>>>>> defined,
> > > > > > > > > >>>>>>>>>>> for example streaming join and unbounded
> > aggregate.
> > > > We
> > > > > > may
> > > > > > > > > >> still
> > > > > > > > > >>>> need
> > > > > > > > > >>>>>>>> to
> > > > > > > > > >>>>>>>>>>> figure out solutions for them.
> > > > > > > > > >>>>>>>>>>> 2. Watermark lag can not be trusted, because it
> > > > > increases
> > > > > > > > > >>>> unlimited
> > > > > > > > > >>>>>> if
> > > > > > > > > >>>>>>>> no
> > > > > > > > > >>>>>>>>>>> data is generated in the Kafka.
> > > > > > > > > >>>>>>>>>>> But in this case, there is no backlog at all.
> > > > > > > > > >>>>>>>>>>> 3. Watermark lag is hard to reflect the amount
> of
> > > > > > backlog.
> > > > > > > > > >> If the
> > > > > > > > > >>>>>>>>>> watermark
> > > > > > > > > >>>>>>>>>>> lag is 1day or 1 hour or 1second,
> > > > > > > > > >>>>>>>>>>> there is possibly only 1 pending record there,
> > > which
> > > > > > means
> > > > > > > no
> > > > > > > > > >>>> backlog
> > > > > > > > > >>>>>>>> at
> > > > > > > > > >>>>>>>>>>> all.
> > > > > > > > > >>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>> Therefore, IMO, watermark maybe not the ideal
> > > metric
> > > > > used
> > > > > > > to
> > > > > > > > > >>>>>> determine
> > > > > > > > > >>>>>>>>>>> "isProcessingBacklog".
> > > > > > > > > >>>>>>>>>>> What we need is something that reflects the
> > number
> > > of
> > > > > > > records
> > > > > > > > > >>>>>>>> unprocessed
> > > > > > > > > >>>>>>>>>>> by the job.
> > > > > > > > > >>>>>>>>>>> Actually, that is the "pendingRecords" metric
> > > > proposed
> > > > > in
> > > > > > > > > >>>> FLIP-33 and
> > > > > > > > > >>>>>>>> has
> > > > > > > > > >>>>>>>>>>> been implemented by Kafka source.
> > > > > > > > > >>>>>>>>>>> Did you consider using "pendingRecords" metric
> to
> > > > > > determine
> > > > > > > > > >>>>>>>>>>> "isProcessingBacklog"?
> > > > > > > > > >>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>> Best,
> > > > > > > > > >>>>>>>>>>> Jark
> > > > > > > > > >>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>> [1]
> > > > > > > > > >>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>
> > > > > > > > > >>>>>>>>
> > > > > > > > > >>>>>>
> > > > > > > > > >>>>
> > > > > > > > > >>
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics
> > > > > > > > > >>>>>>>>>> <
> > > > > > > > > >>>>>>>>>>
> > > > > > > > > >>>>>>>>
> > > > > > > > > >>>>>>
> > > > > > > > > >>>>
> > > > > > > > > >>
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics
> > > > > > > > > >>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>> On Tue, 15 Aug 2023 at 12:04, Xintong Song <
> > > > > > > > > >>>> tonysong...@gmail.com
> > > > > > > > > >>>>>>>>>> <mailto:tonysong...@gmail.com>> wrote:
> > > > > > > > > >>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>> Sounds good to me.
> > > > > > > > > >>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>> It is true that, if we are introducing the
> > > > generalized
> > > > > > > > > >>>> watermark,
> > > > > > > > > >>>>>>>> there
> > > > > > > > > >>>>>>>>>>>> will be other watermark related concepts /
> > > > > > configurations
> > > > > > > > > >> that
> > > > > > > > > >>>> need
> > > > > > > > > >>>>>> to
> > > > > > > > > >>>>>>>>>> be
> > > > > > > > > >>>>>>>>>>>> updated anyway.
> > > > > > > > > >>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>> Best,
> > > > > > > > > >>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>> Xintong
> > > > > > > > > >>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>> On Tue, Aug 15, 2023 at 11:30 AM Xuannan Su <
> > > > > > > > > >>>> suxuanna...@gmail.com
> > > > > > > > > >>>>>>>>>> <mailto:suxuanna...@gmail.com>> wrote:
> > > > > > > > > >>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>> Hi Xingtong,
> > > > > > > > > >>>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>> Thank you for your suggestion.
> > > > > > > > > >>>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>> After considering the idea of using a general
> > > > > > > configuration
> > > > > > > > > >>>> key, I
> > > > > > > > > >>>>>>>>>> think
> > > > > > > > > >>>>>>>>>>>>> it may not be a good idea for the reasons
> > below.
> > > > > > > > > >>>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>> While I agree that using a more general
> > > > configuration
> > > > > > key
> > > > > > > > > >>>> provides
> > > > > > > > > >>>>>> us
> > > > > > > > > >>>>>>>>>>>> with
> > > > > > > > > >>>>>>>>>>>>> the flexibility to switch to other approaches
> > to
> > > > > > > calculate
> > > > > > > > > >> the
> > > > > > > > > >>>> lag
> > > > > > > > > >>>>>> in
> > > > > > > > > >>>>>>>>>> the
> > > > > > > > > >>>>>>>>>>>>> future, the downside is that it may cause
> > > confusion
> > > > > for
> > > > > > > > > >> users.
> > > > > > > > > >>>> We
> > > > > > > > > >>>>>>>>>>>> currently
> > > > > > > > > >>>>>>>>>>>>> have fetchEventTimeLag, emitEventTimeLag, and
> > > > > > > watermarkLag
> > > > > > > > > >> in
> > > > > > > > > >>>> the
> > > > > > > > > >>>>>>>>>> source,
> > > > > > > > > >>>>>>>>>>>>> and it is not clear which specific lag we are
> > > > > referring
> > > > > > > to.
> > > > > > > > > >>>> With
> > > > > > > > > >>>>>> the
> > > > > > > > > >>>>>>>>>>>>> potential introduction of the Generalized
> > > Watermark
> > > > > > > > > >> mechanism
> > > > > > > > > >>>> in
> > > > > > > > > >>>>>> the
> > > > > > > > > >>>>>>>>>>>>> future, if I understand correctly, a
> watermark
> > > > won't
> > > > > > > > > >>>> necessarily
> > > > > > > > > >>>>>> need
> > > > > > > > > >>>>>>>>>> to
> > > > > > > > > >>>>>>>>>>>> be
> > > > > > > > > >>>>>>>>>>>>> a timestamp. I am concern that the general
> > > > > > configuration
> > > > > > > > > >> key
> > > > > > > > > >>>> may
> > > > > > > > > >>>>>> not
> > > > > > > > > >>>>>>>>>> be
> > > > > > > > > >>>>>>>>>>>>> enough to cover all the use case and we will
> > need
> > > > to
> > > > > > > > > >> introduce
> > > > > > > > > >>>> a
> > > > > > > > > >>>>>>>>>> general
> > > > > > > > > >>>>>>>>>>>>> way to determine the backlog status
> regardless.
> > > > > > > > > >>>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>> For the reasons above, I prefer introducing
> the
> > > > > > > > > >> configuration
> > > > > > > > > >>>> as
> > > > > > > > > >>>>>> is,
> > > > > > > > > >>>>>>>>>> and
> > > > > > > > > >>>>>>>>>>>>> change it later with the a deprecation
> process
> > or
> > > > > > > migration
> > > > > > > > > >>>>>> process.
> > > > > > > > > >>>>>>>>>> What
> > > > > > > > > >>>>>>>>>>>>> do you think?
> > > > > > > > > >>>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>> Best,
> > > > > > > > > >>>>>>>>>>>>> Xuannan
> > > > > > > > > >>>>>>>>>>>>> On Aug 14, 2023, 14:09 +0800, Xintong Song <
> > > > > > > > > >>>> tonysong...@gmail.com
> > > > > > > > > >>>>>>>>>> <mailto:tonysong...@gmail.com>>,
> > > > > > > > > >>>>>>>>>>>> wrote:
> > > > > > > > > >>>>>>>>>>>>>> Thanks for the explanation.
> > > > > > > > > >>>>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>>> I wonder if it makes sense to not expose
> this
> > > > detail
> > > > > > via
> > > > > > > > > >> the
> > > > > > > > > >>>>>>>>>>>>> configuration
> > > > > > > > > >>>>>>>>>>>>>> option. To be specific, I suggest not
> > mentioning
> > > > the
> > > > > > > > > >>>> "watermark"
> > > > > > > > > >>>>>>>>>>>> keyword
> > > > > > > > > >>>>>>>>>>>>> in
> > > > > > > > > >>>>>>>>>>>>>> the configuration key and description.
> > > > > > > > > >>>>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>>> - From the users' perspective, I think they
> > only
> > > > > need
> > > > > > to
> > > > > > > > > >> know
> > > > > > > > > >>>>>>>> there's
> > > > > > > > > >>>>>>>>>> a
> > > > > > > > > >>>>>>>>>>>>>> lag higher than the given threshold, Flink
> > will
> > > > > > consider
> > > > > > > > > >>>> latency
> > > > > > > > > >>>>>> of
> > > > > > > > > >>>>>>>>>>>>>> individual records as less important and
> > > > prioritize
> > > > > > > > > >> throughput
> > > > > > > > > >>>>>> over
> > > > > > > > > >>>>>>>>>> it.
> > > > > > > > > >>>>>>>>>>>>>> They don't really need the details of how
> the
> > > lags
> > > > > are
> > > > > > > > > >>>> calculated.
> > > > > > > > > >>>>>>>>>>>>>> - For the internal implementation, I also
> > think
> > > > > using
> > > > > > > > > >>>> watermark
> > > > > > > > > >>>>>> lags
> > > > > > > > > >>>>>>>>>> is
> > > > > > > > > >>>>>>>>>>>>>> a good idea, for the reasons you've already
> > > > > mentioned.
> > > > > > > > > >>>> However,
> > > > > > > > > >>>>>> it's
> > > > > > > > > >>>>>>>>>>>> not
> > > > > > > > > >>>>>>>>>>>>>> the only possible option. Hiding this detail
> > > from
> > > > > > users
> > > > > > > > > >> would
> > > > > > > > > >>>> give
> > > > > > > > > >>>>>>>> us
> > > > > > > > > >>>>>>>>>>>> the
> > > > > > > > > >>>>>>>>>>>>>> flexibility to switch to other approaches if
> > > > needed
> > > > > in
> > > > > > > > > >> future.
> > > > > > > > > >>>>>>>>>>>>>> - We are currently working on designing the
> > > > > > > > > >> ProcessFunction
> > > > > > > > > >>>> API
> > > > > > > > > >>>>>>>>>>>>>> (consider it as a DataStream API V2).
> There's
> > an
> > > > > idea
> > > > > > to
> > > > > > > > > >>>>>> introduce a
> > > > > > > > > >>>>>>>>>>>>>> Generalized Watermark mechanism, where
> > basically
> > > > the
> > > > > > > > > >>>> watermark can
> > > > > > > > > >>>>>>>> be
> > > > > > > > > >>>>>>>>>>>>>> anything that needs to travel along the
> > > data-flow
> > > > > with
> > > > > > > > > >> certain
> > > > > > > > > >>>>>>>>>>>> alignment
> > > > > > > > > >>>>>>>>>>>>>> strategies, and event time watermark would
> be
> > > one
> > > > > > > specific
> > > > > > > > > >>>> case of
> > > > > > > > > >>>>>>>> it.
> > > > > > > > > >>>>>>>>>>>>> This
> > > > > > > > > >>>>>>>>>>>>>> is still an idea and has not been discussed
> > and
> > > > > agreed
> > > > > > > on
> > > > > > > > > >> by
> > > > > > > > > >>>> the
> > > > > > > > > >>>>>>>>>>>>> community,
> > > > > > > > > >>>>>>>>>>>>>> and we are preparing a FLIP for it. But if
> we
> > > are
> > > > > > going
> > > > > > > > > >> for
> > > > > > > > > >>>> it,
> > > > > > > > > >>>>>> the
> > > > > > > > > >>>>>>>>>>>>> concept
> > > > > > > > > >>>>>>>>>>>>>> "watermark-lag-threshold" could be
> ambiguous.
> > > > > > > > > >>>>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>>> I do not intend to block the FLIP on this.
> I'd
> > > > also
> > > > > be
> > > > > > > > > >> fine
> > > > > > > > > >>>> with
> > > > > > > > > >>>>>>>>>>>>>> introducing the configuration as is, and
> > > changing
> > > > it
> > > > > > > > > >> later, if
> > > > > > > > > >>>>>>>> needed,
> > > > > > > > > >>>>>>>>>>>>> with
> > > > > > > > > >>>>>>>>>>>>>> a regular deprecation and migration process.
> > > Just
> > > > > > making
> > > > > > > > > >> my
> > > > > > > > > >>>>>>>>>>>> suggestions.
> > > > > > > > > >>>>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>>> Best,
> > > > > > > > > >>>>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>>> Xintong
> > > > > > > > > >>>>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>>> On Mon, Aug 14, 2023 at 12:00 PM Xuannan Su
> <
> > > > > > > > > >>>>>> suxuanna...@gmail.com
> > > > > > > > > >>>>>>>>>> <mailto:suxuanna...@gmail.com>>
> > > > > > > > > >>>>>>>>>>>>> wrote:
> > > > > > > > > >>>>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>>>> Hi Xintong,
> > > > > > > > > >>>>>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>>>> Thanks for the reply.
> > > > > > > > > >>>>>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>>>> I have considered using the timestamp in
> the
> > > > > records
> > > > > > to
> > > > > > > > > >>>> determine
> > > > > > > > > >>>>>>>> the
> > > > > > > > > >>>>>>>>>>>>>>> backlog status, and decided to use
> watermark
> > at
> > > > the
> > > > > > > end.
> > > > > > > > > >> By
> > > > > > > > > >>>>>>>>>>>> definition,
> > > > > > > > > >>>>>>>>>>>>>>> watermark is the time progress indication
> in
> > > the
> > > > > data
> > > > > > > > > >>>> stream. It
> > > > > > > > > >>>>>>>>>>>>> indicates
> > > > > > > > > >>>>>>>>>>>>>>> the stream’s event time has progressed to
> > some
> > > > > > specific
> > > > > > > > > >>>> time. On
> > > > > > > > > >>>>>>>> the
> > > > > > > > > >>>>>>>>>>>>> other
> > > > > > > > > >>>>>>>>>>>>>>> hand, timestamp in the records is usually
> > used
> > > to
> > > > > > > > > >> generate
> > > > > > > > > >>>> the
> > > > > > > > > >>>>>>>>>>>>> watermark.
> > > > > > > > > >>>>>>>>>>>>>>> Therefore, it appears more appropriate and
> > > > > intuitive
> > > > > > to
> > > > > > > > > >>>> calculate
> > > > > > > > > >>>>>>>> the
> > > > > > > > > >>>>>>>>>>>>> event
> > > > > > > > > >>>>>>>>>>>>>>> time lag by watermark and determine the
> > backlog
> > > > > > status.
> > > > > > > > > >> And
> > > > > > > > > >>>> by
> > > > > > > > > >>>>>>>> using
> > > > > > > > > >>>>>>>>>>>>> the
> > > > > > > > > >>>>>>>>>>>>>>> watermark, we can easily deal with the
> > > > out-of-order
> > > > > > and
> > > > > > > > > >> the
> > > > > > > > > >>>>>>>> idleness
> > > > > > > > > >>>>>>>>>>>>> of the
> > > > > > > > > >>>>>>>>>>>>>>> data.
> > > > > > > > > >>>>>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>>>> Please let me know if you have further
> > > questions.
> > > > > > > > > >>>>>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>>>> Best,
> > > > > > > > > >>>>>>>>>>>>>>> Xuannan
> > > > > > > > > >>>>>>>>>>>>>>> On Aug 10, 2023, 20:23 +0800, Xintong Song
> <
> > > > > > > > > >>>>>> tonysong...@gmail.com
> > > > > > > > > >>>>>>>>>> <mailto:tonysong...@gmail.com>>,
> > > > > > > > > >>>>>>>>>>>>> wrote:
> > > > > > > > > >>>>>>>>>>>>>>>> Thanks for preparing the FLIP, Xuannan.
> > > > > > > > > >>>>>>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>>>>> +1 in general.
> > > > > > > > > >>>>>>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>>>>> A quick question, could you explain why we
> > are
> > > > > > relying
> > > > > > > > > >> on
> > > > > > > > > >>>> the
> > > > > > > > > >>>>>>>>>>>>> watermark
> > > > > > > > > >>>>>>>>>>>>>>> for
> > > > > > > > > >>>>>>>>>>>>>>>> emitting the record attribute? Why not use
> > > > > > timestamps
> > > > > > > > > >> in the
> > > > > > > > > >>>>>>>>>>>>> records? I
> > > > > > > > > >>>>>>>>>>>>>>>> don't see any concern in using watermarks.
> > > Just
> > > > > > > > > >> wondering if
> > > > > > > > > >>>>>>>>>>>> there's
> > > > > > > > > >>>>>>>>>>>>> any
> > > > > > > > > >>>>>>>>>>>>>>>> deep considerations behind this.
> > > > > > > > > >>>>>>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>>>>> Best,
> > > > > > > > > >>>>>>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>>>>> Xintong
> > > > > > > > > >>>>>>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>>>>> On Thu, Aug 3, 2023 at 3:03 PM Xuannan Su
> <
> > > > > > > > > >>>>>> suxuanna...@gmail.com
> > > > > > > > > >>>>>>>>>> <mailto:suxuanna...@gmail.com>>
> > > > > > > > > >>>>>>>>>>>>> wrote:
> > > > > > > > > >>>>>>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>>>>>> Hi all,
> > > > > > > > > >>>>>>>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>>>>>> I am opening this thread to discuss
> > FLIP-328:
> > > > > Allow
> > > > > > > > > >> source
> > > > > > > > > >>>>>>>>>>>>> operators to
> > > > > > > > > >>>>>>>>>>>>>>>>> determine isProcessingBacklog based on
> > > > watermark
> > > > > > > > > >> lag[1]. We
> > > > > > > > > >>>>>> had a
> > > > > > > > > >>>>>>>>>>>>>>> several
> > > > > > > > > >>>>>>>>>>>>>>>>> discussions with Dong Ling about the
> > design,
> > > > and
> > > > > > > thanks
> > > > > > > > > >>>> for all
> > > > > > > > > >>>>>>>>>>>> the
> > > > > > > > > >>>>>>>>>>>>>>>>> valuable advice.
> > > > > > > > > >>>>>>>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>>>>>> The FLIP aims to target the use-case
> where
> > > user
> > > > > > want
> > > > > > > to
> > > > > > > > > >>>> run a
> > > > > > > > > >>>>>>>>>>>> Flink
> > > > > > > > > >>>>>>>>>>>>>>> job to
> > > > > > > > > >>>>>>>>>>>>>>>>> backfill historical data in a high
> > throughput
> > > > > > manner
> > > > > > > > > >> and
> > > > > > > > > >>>>>> continue
> > > > > > > > > >>>>>>>>>>>>>>>>> processing real-time data with low
> latency.
> > > > > > Building
> > > > > > > > > >> upon
> > > > > > > > > >>>> the
> > > > > > > > > >>>>>>>>>>>>> backlog
> > > > > > > > > >>>>>>>>>>>>>>>>> concept introduced in FLIP-309[2], this
> > > > proposal
> > > > > > > > > >> enables
> > > > > > > > > >>>>>> sources
> > > > > > > > > >>>>>>>>>>>> to
> > > > > > > > > >>>>>>>>>>>>>>> report
> > > > > > > > > >>>>>>>>>>>>>>>>> their status of processing backlog based
> on
> > > the
> > > > > > > > > >> watermark
> > > > > > > > > >>>> lag.
> > > > > > > > > >>>>>>>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>>>>>> We would greatly appreciate any comments
> or
> > > > > > feedback
> > > > > > > > > >> you
> > > > > > > > > >>>> may
> > > > > > > > > >>>>>> have
> > > > > > > > > >>>>>>>>>>>>> on
> > > > > > > > > >>>>>>>>>>>>>>> this
> > > > > > > > > >>>>>>>>>>>>>>>>> proposal.
> > > > > > > > > >>>>>>>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>>>>>> Best,
> > > > > > > > > >>>>>>>>>>>>>>>>> Xuannan
> > > > > > > > > >>>>>>>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>>>>>> [1]
> > > > > > > > > >>>>>>>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>
> > > > > > > > > >>>>>>>>
> > > > > > > > > >>>>>>
> > > > > > > > > >>>>
> > > > > > > > > >>
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-328%3A+Allow+source+operators+to+determine+isProcessingBacklog+based+on+watermark+lag
> > > > > > > > > >>>>>>>>>> <
> > > > > > > > > >>>>>>>>>>
> > > > > > > > > >>>>>>>>
> > > > > > > > > >>>>>>
> > > > > > > > > >>>>
> > > > > > > > > >>
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-328%3A+Allow+source+operators+to+determine+isProcessingBacklog+based+on+watermark+lag
> > > > > > > > > >>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>>>>>> [2]
> > > > > > > > > >>>>>>>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>
> > > > > > > > > >>>>>>>>
> > > > > > > > > >>>>>>
> > > > > > > > > >>>>
> > > > > > > > > >>
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-309%3A+Support+using+larger+checkpointing+interval+when+source+is+processing+backlog
> > > > > > > > > >>>>>>>>>> <
> > > > > > > > > >>>>>>>>>>
> > > > > > > > > >>>>>>>>
> > > > > > > > > >>>>>>
> > > > > > > > > >>>>
> > > > > > > > > >>
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-309%3A+Support+using+larger+checkpointing+interval+when+source+is+processing+backlog
> > > > > > > > > >>>>>>>>
> > > > > > > > > >>>>>>>>
> > > > > > > > > >>>>>>>>
> > > > > > > > > >>>>>>
> > > > > > > > > >>>>>>
> > > > > > > > > >>>>
> > > > > > > > > >>
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to