Hi Jark,

Thanks for the comments. Please see my comments inline.

On Sat, Sep 9, 2023 at 4:13 PM Jark Wu <imj...@gmail.com> wrote:

> Hi Xuannan,
>
> I leave my comments inline.
>
> > In the case where a user wants to
> > use a CDC source and also determine backlog status based on watermark
> > lag, we still need to define the rule when that occurs
>
> This rule should be defined by the source itself (who knows backlog best),
> not by the framework. In the case of CDC source, it reports isBacklog=true
> during snapshot stage, and report isBacklog=false during changelog stage if
> watermark-lag is within the threshold.
>

I am not sure I fully understand the difference between adding a job-level
config vs. adding a per-source config.

In the case of CDC, its watermark lag should be either unde-defined or
really large in the snapshot stage. As a result, the proposed job-level
config will be applied only in the changelog stage. So there is no
difference between these two approaches in this particular case, right?

There are two advantages of the job-level config over per-source config:

1) Configuration is simpler. For example, suppose a user has a Flink job
that consumes records from multiple Kafka sources and wants to determine
backlog status for these Kafka sources using the same watermark lag
threshold, there is no need for users to repeatedly specify this threshold
for each source.

2) There is a smaller number of public APIs overall. In particular, instead
of repeatedly adding a setProcessingBacklogWatermarkLagThreshold() API for
every source operator that has even-time watermark lag defined, we only
need to add one job-level config. Less public API means better simplicity
and maintainability in general.

On the other hand, per-source config will be necessary if users want to
apply different watermark lag thresholds for different sources in the same
job. Personally, I find this a bit counter-intuitive for users to specify
different watermark lag thresholds in the same job.

Do you think there is any real-word use-case that requires this? Could you
provide a specific use-case where per-source config can provide an
advantage over the job-level config?


> I think it's not intuitive to combine it with the logical OR operation.
> Even for the
> combination logic of backlog status from different channels, FLIP-309 said
> it is
> "up to the operator to determine its output records' isBacklog value" and
> proposed
> 3 different strategies. Therefore, I think backlog status from a single
> source should
> be up to the source.


For both the job-level config and the per-source config, it is eventually
up to the user to decide the computation logic of the backlog status.
Whether this mechanism is implemented at the per-source level or framework
level is probably more like an implementation detail.

Eventually, I think the choice between these two approaches depends on
whether we have any use-case for users to specify different watermark lag
thresholds in the same job.


>
> IMO, a better API design is not how to resolve conflicts but not
> introducing conflicts.


Just to clarify, the current FLIP does not introduce any conflict. Each
source can have its own rule that specifies when the backlog can be true
(e.g. MySql CDC says the backlog should be true during the snapshot stage).
And we can have a job-level config that specifies when the backlog should
be true. Note that it is designed in such a way that none of these rules
specify when the backlog should be false. That is why there is no conflict
by definition.



Let the source determine backlog status removes the conflicts and I don't
> see big
> disadvantages.
>
> > It should not confuse the user that
> > DataStream#assignTimestampsAndWatermarks doesn't work with
> > backlog.watermark-lag-threshold, as it is not a source.
>
> Hmm, so this configuration may confuse Flink SQL users, because all
> watermarks
> are defined on the source DDL, but it may use a separate operator to emit
> watermarks
> if the source doesn't support emitting watermarks.
>

If I understand your comments correctly, you mean that we might have a
Flink SQL DDL with user-defined watermark expressions. And users also want
to set the backlog to true if the watermark generated by that
user-specified expression exceeds a threshold.

That is a good point and use-case. I agree we should also cover this
scenario. And we can update FLIP-328 to mention that the job-level config
will also be applicable when the watermark derived from the Flink SQL DDL
exceeds this threshold. Would this address your concern?


>
> > I think the description in the FLIP actually means the other way
> > around, where the job can never switch back to batch mode once it has
> > switched into streaming mode. This is to align with the current state
> > of FLIP-327[1], where only switching from batch to stream mode is
> > supported.
>
> This sounds like a limitation of FLIP-327 (that execution mode depends on
> backlog status).
> But the backlog status shouldn't have this limitation, because it is not
> only used for execution
> switching.
>

You are right that this is a limitation. However, this is only a short-term
limitation which we added to make sure that we can focus on the capability
to switch from backlog=true to backlog=false. In the future, we will remove
this limitation and also support switching from backlog=false to
backlog=true.

The capability to switch from backlog=true to backlog=false will mitigate a
lot of problems we are facing now. As it is common for users to start a
Flink job to process backlog data followed by real-time data. On the other
hand, switching from backlog=false to backlog=true is useful when there is
a traffic spike while the Flink job is processing real-time data, which is
also useful to address but less important than the previous one.

Given that both features require considerable changes to the underlying
runtime, we think it might be useful and safe to tackle them one by one.

Thanks again for the comments. Please let us know what you think.

Best,
Dong


>
> Best,
> Jark
>
>
>
> On Fri, 8 Sept 2023 at 19:09, Xuannan Su <suxuanna...@gmail.com> wrote:
>
> > Hi Jark and Leonard,
> >
> > Thanks for the comments. Please see my reply below.
> >
> > @Jark
> >
> > > I think a better API doesn't compete with itself. Therefore, I'm in
> > favor of
> > > supporting the watermark lag threshold for each source without
> > introducing
> > > any framework API and configuration.
> >
> > I don't think supporting the watermark lag threshold for each source
> > can avoid the competition problem. In the case where a user wants to
> > use a CDC source and also determine backlog status based on watermark
> > lag, we still need to define the rule when that occurs. With that
> > said, I think it is more intuitive to combine it with the logical OR
> > operation, as the strategies (FLIP-309, FLIP-328) only determine when
> > the source's backlog status should be True. What do you think?
> >
> > > Besides, this can address another concern that the watermark may be
> > > generated by DataStream#assignTimestampsAnd
> > > Watermarks which doesn't
> > > work with the backlog.watermark-lag-threshold job config
> >
> > The description of the configuration explicitly states that "a source
> > would report isProcessingBacklog=true if its watermark lag exceeds the
> > configured value". It should not confuse the user that
> > DataStream#assignTimestampsAndWatermarks doesn't work with
> > backlog.watermark-lag-threshold, as it is not a source.
> >
> > > Does that mean the job can never back to streaming mode once switches
> > into
> > > backlog mode? It sounds like not a complete FLIP to me. Is it possible
> to
> > > support switching back in this FLIP?
> >
> > I think the description in the FLIP actually means the other way
> > around, where the job can never switch back to batch mode once it has
> > switched into streaming mode. This is to align with the current state
> > of FLIP-327[1], where only switching from batch to stream mode is
> > supported.
> >
> > @Leonard
> >
> > > > The FLIP describe that: And it should report
> isProcessingBacklog=false
> > at the beginning of the snapshot stage.
> > > This should be “changelog stage”
> >
> > I think the description is in FLIP-309. Thanks for pointing out. I
> > updated the description.
> >
> > > I'm not sure if it's enough to support this feature only in FLIP-27
> > Source. Although we are pushing the sourceFunction API to be removed,
> these
> > APIs will be survive one or two versions in flink repo before they are
> > actually removed.
> >
> > I agree that it is good to support the SourceFunction API. However,
> > given that the SourceFunction API is marked as deprecated, I think I
> > will prioritize supporting the FLIP-27 Source. We can support the
> > SourceFunction API after the
> > FLIP-27 source. What do you think?
> >
> > Best regards,
> > Xuannan
> >
> > [1]
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-327%3A+Support+switching+from+batch+to+stream+mode+to+improve+throughput+when+processing+backlog+data
> >
> >
> >
> >
> > On Fri, Sep 8, 2023 at 1:02 AM Leonard Xu <xbjt...@gmail.com> wrote:
> > >
> > > Thanks Xuannan for driving this FLIP !
> > >
> > > The proposal generally looks good to me, but I still left some
> comments:
> > >
> > > > One more question about the FLIP is that the FLIP says "Note that
> this
> > > > config does not support switching source's isProcessingBacklog from
> > false to true
> > > > for now.” Does that mean the job can never back to streaming mode
> once
> > switches into
> > > > backlog mode? It sounds like not a complete FLIP to me. Is it
> possible
> > to
> > > > support switching back in this FLIP?
> > > +1 for Jark’s concern, IIUC, the state transition of
> IsProcessingBacklog
> > depends on whether the data in the source is processing backlog data or
> > not. Different sources will have different backlog status and which may
> > change over time. From a general perspective, we should not have this
> > restriction.
> > >
> > > > The FLIP describe that: And it should report
> isProcessingBacklog=false
> > at the beginning of the snapshot stage.
> > > This should be “changelog stage”
> > >
> > > I'm not sure if it's enough to support this feature only in FLIP-27
> > Source. Although we are pushing the sourceFunction API to be removed,
> these
> > APIs will be survive one or two versions in flink repo before they are
> > actually removed.
> > >
> > > Best,
> > > Leonard
> > >
> > > >
> > > > Best,
> > > > Jark
> > > >
> > > >
> > > > On Thu, 7 Sept 2023 at 13:51, Xuannan Su <suxuanna...@gmail.com>
> > wrote:
> > > >
> > > >> Hi all,
> > > >>
> > > >> Thank you for all the reviews and suggestions.
> > > >>
> > > >> I believe all the comments have been addressed. If there are no
> > > >> further comments, I plan to open the voting thread for this FLIP
> early
> > > >> next week.
> > > >>
> > > >> Best regards,
> > > >> Xuannan
> > > >>
> > > >> On Thu, Sep 7, 2023 at 12:09 AM Jing Ge <j...@ververica.com.invalid
> >
> > > >> wrote:
> > > >>>
> > > >>> Hi Xuannan,
> > > >>>
> > > >>> I thought FLIP-328 will compete with FLIP-309 while setting the
> > value of
> > > >>> the backlog. Understood. Thanks for the hint.
> > > >>>
> > > >>> Best regards,
> > > >>> Jing
> > > >>>
> > > >>> On Wed, Sep 6, 2023 at 12:12 PM Xuannan Su <suxuanna...@gmail.com>
> > > >> wrote:
> > > >>>
> > > >>>> Hi Jing,
> > > >>>>
> > > >>>> Thank you for the clarification.
> > > >>>>
> > > >>>> For the use case you mentioned, I believe we can utilize the
> > > >>>> HybridSource, as updated in FLIP-309[1], to determine the backlog
> > > >>>> status. For example, if the user wants to process data before
> time T
> > > >>>> in batch mode and after time T in stream mode, they can set the
> > first
> > > >>>> source of the HybridSource to read up to time T and the last
> source
> > of
> > > >>>> the HybridSource to read from time T.
> > > >>>>
> > > >>>> Best,
> > > >>>> Xuannan
> > > >>>>
> > > >>>> [1]
> > > >>>>
> > > >>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-309%3A+Support+using+larger+checkpointing+interval+when+source+is+processing+backlog
> > > >>>>
> > > >>>>
> > > >>>> On Mon, Sep 4, 2023 at 10:36 PM Jing Ge
> <j...@ververica.com.invalid
> > >
> > > >>>> wrote:
> > > >>>>>
> > > >>>>> Hi Xuannan,
> > > >>>>>
> > > >>>>> Thanks for the clarification.
> > > >>>>>
> > > >>>>> 3. Event time and process time are two different things. It might
> > be
> > > >>>> rarely
> > > >>>>> used, but conceptually, users can process data in the past
> within a
> > > >>>>> specific time range in the streaming mode. All data before that
> > range
> > > >>>> will
> > > >>>>> be considered as backlog and needed to be processed in the batch
> > > >> mode,
> > > >>>>> like, e.g. the Present Perfect Progressive tense used in English
> > > >>>> language.
> > > >>>>>
> > > >>>>> Best regards,
> > > >>>>> Jing
> > > >>>>>
> > > >>>>> On Thu, Aug 31, 2023 at 4:45 AM Xuannan Su <
> suxuanna...@gmail.com>
> > > >>>> wrote:
> > > >>>>>
> > > >>>>>> Hi Jing,
> > > >>>>>>
> > > >>>>>> Thanks for the reply.
> > > >>>>>>
> > > >>>>>> 1. You are absolutely right that the watermark lag threshold
> must
> > > >> be
> > > >>>>>> carefully set with a thorough understanding of watermark
> > > >> generation.
> > > >>>> It is
> > > >>>>>> crucial for users to take into account the WatermarkStrategy
> when
> > > >>>> setting
> > > >>>>>> the watermark lag threshold.
> > > >>>>>>
> > > >>>>>> 2. Regarding pure processing-time based stream processing jobs,
> > > >>>>>> alternative strategies will be implemented to determine whether
> > the
> > > >>>> job is
> > > >>>>>> processing backlog data. I have outlined two possible strategies
> > > >> below:
> > > >>>>>>
> > > >>>>>> - Based on the source operator's state. For example, when MySQL
> > CDC
> > > >>>> source
> > > >>>>>> is reading snapshot, it can claim isBacklog=true.
> > > >>>>>> - Based on metrics. For example, when busyTimeMsPerSecond (or
> > > >>>>>> backPressuredTimeMsPerSecond) > user_specified_threshold, then
> > > >>>>>> isBacklog=true.
> > > >>>>>>
> > > >>>>>> As of the strategies proposed in this FLIP, it rely on generated
> > > >>>>>> watermarks. Therefore, if a user intends for the job to detect
> > > >> backlog
> > > >>>>>> status based on watermark, it is necessary to generate the
> > > >> watermark.
> > > >>>>>>
> > > >>>>>> 3. I'm afraid I'm not fully grasping your question. From my
> > > >>>> understanding,
> > > >>>>>> it should work in both cases. When event times are close to the
> > > >>>> processing
> > > >>>>>> time, resulting in watermarks close to the processing time, the
> > > >> job is
> > > >>>> not
> > > >>>>>> processing backlog data. On the other hand, when event times are
> > > >> far
> > > >>>> from
> > > >>>>>> processing time, causing watermarks to also be distant, if the
> lag
> > > >>>>>> surpasses the defined threshold, the job is considered
> processing
> > > >>>> backlog
> > > >>>>>> data.
> > > >>>>>>
> > > >>>>>> Best,
> > > >>>>>> Xuannan
> > > >>>>>>
> > > >>>>>>
> > > >>>>>>> On Aug 31, 2023, at 02:56, Jing Ge <j...@ververica.com.INVALID
> >
> > > >>>> wrote:
> > > >>>>>>>
> > > >>>>>>> Hi Xuannan,
> > > >>>>>>>
> > > >>>>>>> Thanks for the clarification. That is the part where I am
> trying
> > > >> to
> > > >>>>>>> understand your thoughts. I have some follow-up questions:
> > > >>>>>>>
> > > >>>>>>> 1. It depends strongly on the watermarkStrategy and how
> > > >> customized
> > > >>>>>>> watermark generation looks like. It mixes business logic with
> > > >>>> technical
> > > >>>>>>> implementation and technical data processing mode. The value of
> > > >> the
> > > >>>>>>> watermark lag threshold must be set very carefully. If the
> value
> > > >> is
> > > >>>> too
> > > >>>>>>> small. any time, when the watermark generation logic is
> > > >>>> changed(business
> > > >>>>>>> logic changes lead to the threshold getting exceeded), the same
> > > >> job
> > > >>>> might
> > > >>>>>>> be running surprisingly in backlog processing mode, i.e. a
> > > >> butterfly
> > > >>>>>>> effect. A comprehensive documentation is required to avoid any
> > > >>>> confusion
> > > >>>>>>> for the users.
> > > >>>>>>> 2. Like Jark already mentioned, use cases that do not have
> > > >>>> watermarks,
> > > >>>>>>> like pure processing-time based stream processing[1] are not
> > > >>>> covered. It
> > > >>>>>> is
> > > >>>>>>> more or less a trade-off solution that does not support such
> use
> > > >>>> cases
> > > >>>>>> and
> > > >>>>>>> appropriate documentation is required. Forcing them to
> explicitly
> > > >>>>>> generate
> > > >>>>>>> watermarks that are never needed just because of this does not
> > > >> sound
> > > >>>>>> like a
> > > >>>>>>> proper solution.
> > > >>>>>>> 3. If I am not mistaken, it only works for use cases where
> event
> > > >>>> times
> > > >>>>>> are
> > > >>>>>>> very close to the processing times, because the wall clock is
> > > >> used to
> > > >>>>>>> calculate the watermark lag and the watermark is generated
> based
> > > >> on
> > > >>>> the
> > > >>>>>>> event time.
> > > >>>>>>>
> > > >>>>>>> Best regards,
> > > >>>>>>> Jing
> > > >>>>>>>
> > > >>>>>>> [1]
> > > >>>>>>>
> > > >>>>>>
> > > >>>>
> > > >>
> >
> https://github.com/apache/flink/blob/2c50b4e956305426f478b726d4de4a640a16b810/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkStrategy.java#L236
> > > >>>>>>>
> > > >>>>>>> On Wed, Aug 30, 2023 at 4:06 AM Xuannan Su <
> > > >> suxuanna...@gmail.com>
> > > >>>>>> wrote:
> > > >>>>>>>
> > > >>>>>>>> Hi Jing,
> > > >>>>>>>>
> > > >>>>>>>> Thank you for the suggestion.
> > > >>>>>>>>
> > > >>>>>>>> The definition of watermark lag is the same as the
> watermarkLag
> > > >>>> metric
> > > >>>>>> in
> > > >>>>>>>> FLIP-33[1]. More specifically, the watermark lag calculation
> is
> > > >>>>>> computed at
> > > >>>>>>>> the time when a watermark is emitted downstream in the
> following
> > > >>>> way:
> > > >>>>>>>> watermarkLag = CurrentTime - Watermark. I have added this
> > > >>>> description to
> > > >>>>>>>> the FLIP.
> > > >>>>>>>>
> > > >>>>>>>> I hope this addresses your concern.
> > > >>>>>>>>
> > > >>>>>>>> Best,
> > > >>>>>>>> Xuannan
> > > >>>>>>>>
> > > >>>>>>>> [1]
> > > >>>>>>>>
> > > >>>>>>
> > > >>>>
> > > >>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics
> > > >>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>>> On Aug 28, 2023, at 01:04, Jing Ge
> <j...@ververica.com.INVALID
> > > >>>
> > > >>>> wrote:
> > > >>>>>>>>>
> > > >>>>>>>>> Hi Xuannan,
> > > >>>>>>>>>
> > > >>>>>>>>> Thanks for the proposal. +1 for me.
> > > >>>>>>>>>
> > > >>>>>>>>> There is one tiny thing that I am not sure if I understand it
> > > >>>>>> correctly.
> > > >>>>>>>>> Since there will be many different WatermarkStrategies and
> > > >>>> different
> > > >>>>>>>>> WatermarkGenerators. Could you please update the FLIP and add
> > > >> the
> > > >>>>>>>>> description of how the watermark lag is calculated exactly?
> > > >> E.g.
> > > >>>>>>>> Watermark
> > > >>>>>>>>> lag = A - B with A is the timestamp of the watermark emitted
> > > >> to the
> > > >>>>>>>>> downstream and B is....(this is the part I am not really sure
> > > >> after
> > > >>>>>>>> reading
> > > >>>>>>>>> the FLIP).
> > > >>>>>>>>>
> > > >>>>>>>>> Best regards,
> > > >>>>>>>>> Jing
> > > >>>>>>>>>
> > > >>>>>>>>>
> > > >>>>>>>>> On Mon, Aug 21, 2023 at 9:03 AM Xuannan Su <
> > > >> suxuanna...@gmail.com>
> > > >>>>>>>> wrote:
> > > >>>>>>>>>
> > > >>>>>>>>>> Hi Jark,
> > > >>>>>>>>>>
> > > >>>>>>>>>> Thanks for the comments.
> > > >>>>>>>>>>
> > > >>>>>>>>>> I agree that the current solution cannot support jobs that
> > > >> cannot
> > > >>>>>> define
> > > >>>>>>>>>> watermarks. However, after considering the
> > > >> pending-record-based
> > > >>>>>>>> solution, I
> > > >>>>>>>>>> believe the current solution is superior for the target use
> > > >> case
> > > >>>> as it
> > > >>>>>>>> is
> > > >>>>>>>>>> more intuitive for users. The backlog status gives users the
> > > >>>> ability
> > > >>>>>> to
> > > >>>>>>>>>> balance between throughput and latency. Making this
> trade-off
> > > >>>> decision
> > > >>>>>>>>>> based on the watermark lag is more intuitive from the user's
> > > >>>>>>>> perspective.
> > > >>>>>>>>>> For instance, a user can decide that if the job lags behind
> > > >> the
> > > >>>>>> current
> > > >>>>>>>>>> time by more than 1 hour, the result is not usable. In that
> > > >> case,
> > > >>>> we
> > > >>>>>> can
> > > >>>>>>>>>> optimize for throughput when the data lags behind by more
> > > >> than an
> > > >>>>>> hour.
> > > >>>>>>>>>> With the pending-record-based solution, it's challenging for
> > > >>>> users to
> > > >>>>>>>>>> determine when to optimize for throughput and when to
> > > >> prioritize
> > > >>>>>>>> latency.
> > > >>>>>>>>>>
> > > >>>>>>>>>> Regarding the limitations of the watermark-based solution:
> > > >>>>>>>>>>
> > > >>>>>>>>>> 1. The current solution can support jobs with sources that
> > > >> have
> > > >>>> event
> > > >>>>>>>>>> time. Users can always define a watermark at the source
> > > >> operator,
> > > >>>> even
> > > >>>>>>>> if
> > > >>>>>>>>>> it's not used by downstream operators, such as streaming
> join
> > > >> and
> > > >>>>>>>> unbounded
> > > >>>>>>>>>> aggregate.
> > > >>>>>>>>>>
> > > >>>>>>>>>> 2.I don't believe it's accurate to say that the watermark
> lag
> > > >> will
> > > >>>>>> keep
> > > >>>>>>>>>> increasing if no data is generated in Kafka. The watermark
> > > >> lag and
> > > >>>>>>>> backlog
> > > >>>>>>>>>> status are determined at the moment when the watermark is
> > > >> emitted
> > > >>>> to
> > > >>>>>> the
> > > >>>>>>>>>> downstream operator. If no data is emitted from the source,
> > > >> the
> > > >>>>>>>> watermark
> > > >>>>>>>>>> lag and backlog status will not be updated. If the
> > > >>>> WatermarkStrategy
> > > >>>>>>>> with
> > > >>>>>>>>>> idleness is used, the source becomes non-backlog when it
> > > >> becomes
> > > >>>> idle.
> > > >>>>>>>>>>
> > > >>>>>>>>>> 3. I think watermark lag is more intuitive to determine if a
> > > >> job
> > > >>>> is
> > > >>>>>>>>>> processing backlog data. Even when using pending records, it
> > > >>>> faces a
> > > >>>>>>>>>> similar issue. For example, if the source has 1K pending
> > > >> records,
> > > >>>>>> those
> > > >>>>>>>>>> records can span from 1 day  to 1 hour to 1 second. If the
> > > >> records
> > > >>>>>> span
> > > >>>>>>>> 1
> > > >>>>>>>>>> day, it's probably best to optimize for throughput. If they
> > > >> span 1
> > > >>>>>>>> hour, it
> > > >>>>>>>>>> depends on the business logic. If they span 1 second,
> > > >> optimizing
> > > >>>> for
> > > >>>>>>>>>> latency is likely the better choice.
> > > >>>>>>>>>>
> > > >>>>>>>>>> In summary, I believe the watermark-based solution is a
> > > >> superior
> > > >>>>>> choice
> > > >>>>>>>>>> for the target use case where watermark/event time can be
> > > >> defined.
> > > >>>>>>>>>> Additionally, I haven't come across a scenario that requires
> > > >>>>>> low-latency
> > > >>>>>>>>>> processing and reads from a source that cannot define
> > > >> watermarks.
> > > >>>> If
> > > >>>>>> we
> > > >>>>>>>>>> encounter such a use case, we can create another FLIP to
> > > >> address
> > > >>>> those
> > > >>>>>>>>>> needs in the future. What do you think?
> > > >>>>>>>>>>
> > > >>>>>>>>>>
> > > >>>>>>>>>> Best,
> > > >>>>>>>>>> Xuannan
> > > >>>>>>>>>>
> > > >>>>>>>>>>
> > > >>>>>>>>>>
> > > >>>>>>>>>>> On Aug 20, 2023, at 23:27, Jark Wu <imj...@gmail.com
> > > >> <mailto:
> > > >>>>>>>>>> imj...@gmail.com>> wrote:
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> Hi Xuannan,
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> Thanks for opening this discussion.
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> This current proposal may work in the mentioned watermark
> > > >> cases.
> > > >>>>>>>>>>> However, it seems this is not a general solution for
> sources
> > > >> to
> > > >>>>>>>> determine
> > > >>>>>>>>>>> "isProcessingBacklog".
> > > >>>>>>>>>>> From my point of view, there are 3 limitations of the
> current
> > > >>>>>> proposal:
> > > >>>>>>>>>>> 1. It doesn't cover jobs that don't have
> watermark/event-time
> > > >>>>>> defined,
> > > >>>>>>>>>>> for example streaming join and unbounded aggregate. We may
> > > >> still
> > > >>>> need
> > > >>>>>>>> to
> > > >>>>>>>>>>> figure out solutions for them.
> > > >>>>>>>>>>> 2. Watermark lag can not be trusted, because it increases
> > > >>>> unlimited
> > > >>>>>> if
> > > >>>>>>>> no
> > > >>>>>>>>>>> data is generated in the Kafka.
> > > >>>>>>>>>>> But in this case, there is no backlog at all.
> > > >>>>>>>>>>> 3. Watermark lag is hard to reflect the amount of backlog.
> > > >> If the
> > > >>>>>>>>>> watermark
> > > >>>>>>>>>>> lag is 1day or 1 hour or 1second,
> > > >>>>>>>>>>> there is possibly only 1 pending record there, which means
> no
> > > >>>> backlog
> > > >>>>>>>> at
> > > >>>>>>>>>>> all.
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> Therefore, IMO, watermark maybe not the ideal metric used
> to
> > > >>>>>> determine
> > > >>>>>>>>>>> "isProcessingBacklog".
> > > >>>>>>>>>>> What we need is something that reflects the number of
> records
> > > >>>>>>>> unprocessed
> > > >>>>>>>>>>> by the job.
> > > >>>>>>>>>>> Actually, that is the "pendingRecords" metric proposed in
> > > >>>> FLIP-33 and
> > > >>>>>>>> has
> > > >>>>>>>>>>> been implemented by Kafka source.
> > > >>>>>>>>>>> Did you consider using "pendingRecords" metric to determine
> > > >>>>>>>>>>> "isProcessingBacklog"?
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> Best,
> > > >>>>>>>>>>> Jark
> > > >>>>>>>>>>>
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> [1]
> > > >>>>>>>>>>>
> > > >>>>>>>>>>
> > > >>>>>>>>
> > > >>>>>>
> > > >>>>
> > > >>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics
> > > >>>>>>>>>> <
> > > >>>>>>>>>>
> > > >>>>>>>>
> > > >>>>>>
> > > >>>>
> > > >>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics
> > > >>>>>>>>>>>
> > > >>>>>>>>>>>
> > > >>>>>>>>>>>
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> On Tue, 15 Aug 2023 at 12:04, Xintong Song <
> > > >>>> tonysong...@gmail.com
> > > >>>>>>>>>> <mailto:tonysong...@gmail.com>> wrote:
> > > >>>>>>>>>>>
> > > >>>>>>>>>>>> Sounds good to me.
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> It is true that, if we are introducing the generalized
> > > >>>> watermark,
> > > >>>>>>>> there
> > > >>>>>>>>>>>> will be other watermark related concepts / configurations
> > > >> that
> > > >>>> need
> > > >>>>>> to
> > > >>>>>>>>>> be
> > > >>>>>>>>>>>> updated anyway.
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> Best,
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> Xintong
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> On Tue, Aug 15, 2023 at 11:30 AM Xuannan Su <
> > > >>>> suxuanna...@gmail.com
> > > >>>>>>>>>> <mailto:suxuanna...@gmail.com>> wrote:
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>>> Hi Xingtong,
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> Thank you for your suggestion.
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> After considering the idea of using a general
> configuration
> > > >>>> key, I
> > > >>>>>>>>>> think
> > > >>>>>>>>>>>>> it may not be a good idea for the reasons below.
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> While I agree that using a more general configuration key
> > > >>>> provides
> > > >>>>>> us
> > > >>>>>>>>>>>> with
> > > >>>>>>>>>>>>> the flexibility to switch to other approaches to
> calculate
> > > >> the
> > > >>>> lag
> > > >>>>>> in
> > > >>>>>>>>>> the
> > > >>>>>>>>>>>>> future, the downside is that it may cause confusion for
> > > >> users.
> > > >>>> We
> > > >>>>>>>>>>>> currently
> > > >>>>>>>>>>>>> have fetchEventTimeLag, emitEventTimeLag, and
> watermarkLag
> > > >> in
> > > >>>> the
> > > >>>>>>>>>> source,
> > > >>>>>>>>>>>>> and it is not clear which specific lag we are referring
> to.
> > > >>>> With
> > > >>>>>> the
> > > >>>>>>>>>>>>> potential introduction of the Generalized Watermark
> > > >> mechanism
> > > >>>> in
> > > >>>>>> the
> > > >>>>>>>>>>>>> future, if I understand correctly, a watermark won't
> > > >>>> necessarily
> > > >>>>>> need
> > > >>>>>>>>>> to
> > > >>>>>>>>>>>> be
> > > >>>>>>>>>>>>> a timestamp. I am concern that the general configuration
> > > >> key
> > > >>>> may
> > > >>>>>> not
> > > >>>>>>>>>> be
> > > >>>>>>>>>>>>> enough to cover all the use case and we will need to
> > > >> introduce
> > > >>>> a
> > > >>>>>>>>>> general
> > > >>>>>>>>>>>>> way to determine the backlog status regardless.
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> For the reasons above, I prefer introducing the
> > > >> configuration
> > > >>>> as
> > > >>>>>> is,
> > > >>>>>>>>>> and
> > > >>>>>>>>>>>>> change it later with the a deprecation process or
> migration
> > > >>>>>> process.
> > > >>>>>>>>>> What
> > > >>>>>>>>>>>>> do you think?
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> Best,
> > > >>>>>>>>>>>>> Xuannan
> > > >>>>>>>>>>>>> On Aug 14, 2023, 14:09 +0800, Xintong Song <
> > > >>>> tonysong...@gmail.com
> > > >>>>>>>>>> <mailto:tonysong...@gmail.com>>,
> > > >>>>>>>>>>>> wrote:
> > > >>>>>>>>>>>>>> Thanks for the explanation.
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> I wonder if it makes sense to not expose this detail via
> > > >> the
> > > >>>>>>>>>>>>> configuration
> > > >>>>>>>>>>>>>> option. To be specific, I suggest not mentioning the
> > > >>>> "watermark"
> > > >>>>>>>>>>>> keyword
> > > >>>>>>>>>>>>> in
> > > >>>>>>>>>>>>>> the configuration key and description.
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> - From the users' perspective, I think they only need to
> > > >> know
> > > >>>>>>>> there's
> > > >>>>>>>>>> a
> > > >>>>>>>>>>>>>> lag higher than the given threshold, Flink will consider
> > > >>>> latency
> > > >>>>>> of
> > > >>>>>>>>>>>>>> individual records as less important and prioritize
> > > >> throughput
> > > >>>>>> over
> > > >>>>>>>>>> it.
> > > >>>>>>>>>>>>>> They don't really need the details of how the lags are
> > > >>>> calculated.
> > > >>>>>>>>>>>>>> - For the internal implementation, I also think using
> > > >>>> watermark
> > > >>>>>> lags
> > > >>>>>>>>>> is
> > > >>>>>>>>>>>>>> a good idea, for the reasons you've already mentioned.
> > > >>>> However,
> > > >>>>>> it's
> > > >>>>>>>>>>>> not
> > > >>>>>>>>>>>>>> the only possible option. Hiding this detail from users
> > > >> would
> > > >>>> give
> > > >>>>>>>> us
> > > >>>>>>>>>>>> the
> > > >>>>>>>>>>>>>> flexibility to switch to other approaches if needed in
> > > >> future.
> > > >>>>>>>>>>>>>> - We are currently working on designing the
> > > >> ProcessFunction
> > > >>>> API
> > > >>>>>>>>>>>>>> (consider it as a DataStream API V2). There's an idea to
> > > >>>>>> introduce a
> > > >>>>>>>>>>>>>> Generalized Watermark mechanism, where basically the
> > > >>>> watermark can
> > > >>>>>>>> be
> > > >>>>>>>>>>>>>> anything that needs to travel along the data-flow with
> > > >> certain
> > > >>>>>>>>>>>> alignment
> > > >>>>>>>>>>>>>> strategies, and event time watermark would be one
> specific
> > > >>>> case of
> > > >>>>>>>> it.
> > > >>>>>>>>>>>>> This
> > > >>>>>>>>>>>>>> is still an idea and has not been discussed and agreed
> on
> > > >> by
> > > >>>> the
> > > >>>>>>>>>>>>> community,
> > > >>>>>>>>>>>>>> and we are preparing a FLIP for it. But if we are going
> > > >> for
> > > >>>> it,
> > > >>>>>> the
> > > >>>>>>>>>>>>> concept
> > > >>>>>>>>>>>>>> "watermark-lag-threshold" could be ambiguous.
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> I do not intend to block the FLIP on this. I'd also be
> > > >> fine
> > > >>>> with
> > > >>>>>>>>>>>>>> introducing the configuration as is, and changing it
> > > >> later, if
> > > >>>>>>>> needed,
> > > >>>>>>>>>>>>> with
> > > >>>>>>>>>>>>>> a regular deprecation and migration process. Just making
> > > >> my
> > > >>>>>>>>>>>> suggestions.
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> Best,
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> Xintong
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> On Mon, Aug 14, 2023 at 12:00 PM Xuannan Su <
> > > >>>>>> suxuanna...@gmail.com
> > > >>>>>>>>>> <mailto:suxuanna...@gmail.com>>
> > > >>>>>>>>>>>>> wrote:
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> Hi Xintong,
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> Thanks for the reply.
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> I have considered using the timestamp in the records to
> > > >>>> determine
> > > >>>>>>>> the
> > > >>>>>>>>>>>>>>> backlog status, and decided to use watermark at the
> end.
> > > >> By
> > > >>>>>>>>>>>> definition,
> > > >>>>>>>>>>>>>>> watermark is the time progress indication in the data
> > > >>>> stream. It
> > > >>>>>>>>>>>>> indicates
> > > >>>>>>>>>>>>>>> the stream’s event time has progressed to some specific
> > > >>>> time. On
> > > >>>>>>>> the
> > > >>>>>>>>>>>>> other
> > > >>>>>>>>>>>>>>> hand, timestamp in the records is usually used to
> > > >> generate
> > > >>>> the
> > > >>>>>>>>>>>>> watermark.
> > > >>>>>>>>>>>>>>> Therefore, it appears more appropriate and intuitive to
> > > >>>> calculate
> > > >>>>>>>> the
> > > >>>>>>>>>>>>> event
> > > >>>>>>>>>>>>>>> time lag by watermark and determine the backlog status.
> > > >> And
> > > >>>> by
> > > >>>>>>>> using
> > > >>>>>>>>>>>>> the
> > > >>>>>>>>>>>>>>> watermark, we can easily deal with the out-of-order and
> > > >> the
> > > >>>>>>>> idleness
> > > >>>>>>>>>>>>> of the
> > > >>>>>>>>>>>>>>> data.
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> Please let me know if you have further questions.
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> Best,
> > > >>>>>>>>>>>>>>> Xuannan
> > > >>>>>>>>>>>>>>> On Aug 10, 2023, 20:23 +0800, Xintong Song <
> > > >>>>>> tonysong...@gmail.com
> > > >>>>>>>>>> <mailto:tonysong...@gmail.com>>,
> > > >>>>>>>>>>>>> wrote:
> > > >>>>>>>>>>>>>>>> Thanks for preparing the FLIP, Xuannan.
> > > >>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>> +1 in general.
> > > >>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>> A quick question, could you explain why we are relying
> > > >> on
> > > >>>> the
> > > >>>>>>>>>>>>> watermark
> > > >>>>>>>>>>>>>>> for
> > > >>>>>>>>>>>>>>>> emitting the record attribute? Why not use timestamps
> > > >> in the
> > > >>>>>>>>>>>>> records? I
> > > >>>>>>>>>>>>>>>> don't see any concern in using watermarks. Just
> > > >> wondering if
> > > >>>>>>>>>>>> there's
> > > >>>>>>>>>>>>> any
> > > >>>>>>>>>>>>>>>> deep considerations behind this.
> > > >>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>> Best,
> > > >>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>> Xintong
> > > >>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>> On Thu, Aug 3, 2023 at 3:03 PM Xuannan Su <
> > > >>>>>> suxuanna...@gmail.com
> > > >>>>>>>>>> <mailto:suxuanna...@gmail.com>>
> > > >>>>>>>>>>>>> wrote:
> > > >>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>> Hi all,
> > > >>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>> I am opening this thread to discuss FLIP-328: Allow
> > > >> source
> > > >>>>>>>>>>>>> operators to
> > > >>>>>>>>>>>>>>>>> determine isProcessingBacklog based on watermark
> > > >> lag[1]. We
> > > >>>>>> had a
> > > >>>>>>>>>>>>>>> several
> > > >>>>>>>>>>>>>>>>> discussions with Dong Ling about the design, and
> thanks
> > > >>>> for all
> > > >>>>>>>>>>>> the
> > > >>>>>>>>>>>>>>>>> valuable advice.
> > > >>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>> The FLIP aims to target the use-case where user want
> to
> > > >>>> run a
> > > >>>>>>>>>>>> Flink
> > > >>>>>>>>>>>>>>> job to
> > > >>>>>>>>>>>>>>>>> backfill historical data in a high throughput manner
> > > >> and
> > > >>>>>> continue
> > > >>>>>>>>>>>>>>>>> processing real-time data with low latency. Building
> > > >> upon
> > > >>>> the
> > > >>>>>>>>>>>>> backlog
> > > >>>>>>>>>>>>>>>>> concept introduced in FLIP-309[2], this proposal
> > > >> enables
> > > >>>>>> sources
> > > >>>>>>>>>>>> to
> > > >>>>>>>>>>>>>>> report
> > > >>>>>>>>>>>>>>>>> their status of processing backlog based on the
> > > >> watermark
> > > >>>> lag.
> > > >>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>> We would greatly appreciate any comments or feedback
> > > >> you
> > > >>>> may
> > > >>>>>> have
> > > >>>>>>>>>>>>> on
> > > >>>>>>>>>>>>>>> this
> > > >>>>>>>>>>>>>>>>> proposal.
> > > >>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>> Best,
> > > >>>>>>>>>>>>>>>>> Xuannan
> > > >>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>> [1]
> > > >>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>
> > > >>>>>>>>
> > > >>>>>>
> > > >>>>
> > > >>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-328%3A+Allow+source+operators+to+determine+isProcessingBacklog+based+on+watermark+lag
> > > >>>>>>>>>> <
> > > >>>>>>>>>>
> > > >>>>>>>>
> > > >>>>>>
> > > >>>>
> > > >>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-328%3A+Allow+source+operators+to+determine+isProcessingBacklog+based+on+watermark+lag
> > > >>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>> [2]
> > > >>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>
> > > >>>>>>>>
> > > >>>>>>
> > > >>>>
> > > >>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-309%3A+Support+using+larger+checkpointing+interval+when+source+is+processing+backlog
> > > >>>>>>>>>> <
> > > >>>>>>>>>>
> > > >>>>>>>>
> > > >>>>>>
> > > >>>>
> > > >>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-309%3A+Support+using+larger+checkpointing+interval+when+source+is+processing+backlog
> > > >>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>>
> > > >>>>>>
> > > >>>>>>
> > > >>>>
> > > >>
> > >
> >
>

Reply via email to