Hi Xuannan,

I thought FLIP-328 will compete with FLIP-309 while setting the value of
the backlog. Understood. Thanks for the hint.

Best regards,
Jing

On Wed, Sep 6, 2023 at 12:12 PM Xuannan Su <suxuanna...@gmail.com> wrote:

> Hi Jing,
>
> Thank you for the clarification.
>
> For the use case you mentioned, I believe we can utilize the
> HybridSource, as updated in FLIP-309[1], to determine the backlog
> status. For example, if the user wants to process data before time T
> in batch mode and after time T in stream mode, they can set the first
> source of the HybridSource to read up to time T and the last source of
> the HybridSource to read from time T.
>
> Best,
> Xuannan
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-309%3A+Support+using+larger+checkpointing+interval+when+source+is+processing+backlog
>
>
> On Mon, Sep 4, 2023 at 10:36 PM Jing Ge <j...@ververica.com.invalid>
> wrote:
> >
> > Hi Xuannan,
> >
> > Thanks for the clarification.
> >
> > 3. Event time and process time are two different things. It might be
> rarely
> > used, but conceptually, users can process data in the past within a
> > specific time range in the streaming mode. All data before that range
> will
> > be considered as backlog and needed to be processed in the batch mode,
> > like, e.g. the Present Perfect Progressive tense used in English
> language.
> >
> > Best regards,
> > Jing
> >
> > On Thu, Aug 31, 2023 at 4:45 AM Xuannan Su <suxuanna...@gmail.com>
> wrote:
> >
> > > Hi Jing,
> > >
> > > Thanks for the reply.
> > >
> > > 1. You are absolutely right that the watermark lag threshold must be
> > > carefully set with a thorough understanding of watermark generation.
> It is
> > > crucial for users to take into account the WatermarkStrategy when
> setting
> > > the watermark lag threshold.
> > >
> > > 2. Regarding pure processing-time based stream processing jobs,
> > > alternative strategies will be implemented to determine whether the
> job is
> > > processing backlog data. I have outlined two possible strategies below:
> > >
> > > - Based on the source operator's state. For example, when MySQL CDC
> source
> > > is reading snapshot, it can claim isBacklog=true.
> > > - Based on metrics. For example, when busyTimeMsPerSecond (or
> > > backPressuredTimeMsPerSecond) > user_specified_threshold, then
> > > isBacklog=true.
> > >
> > > As of the strategies proposed in this FLIP, it rely on generated
> > > watermarks. Therefore, if a user intends for the job to detect backlog
> > > status based on watermark, it is necessary to generate the watermark.
> > >
> > > 3. I'm afraid I'm not fully grasping your question. From my
> understanding,
> > > it should work in both cases. When event times are close to the
> processing
> > > time, resulting in watermarks close to the processing time, the job is
> not
> > > processing backlog data. On the other hand, when event times are far
> from
> > > processing time, causing watermarks to also be distant, if the lag
> > > surpasses the defined threshold, the job is considered processing
> backlog
> > > data.
> > >
> > > Best,
> > > Xuannan
> > >
> > >
> > > > On Aug 31, 2023, at 02:56, Jing Ge <j...@ververica.com.INVALID>
> wrote:
> > > >
> > > > Hi Xuannan,
> > > >
> > > > Thanks for the clarification. That is the part where I am trying to
> > > > understand your thoughts. I have some follow-up questions:
> > > >
> > > > 1. It depends strongly on the watermarkStrategy and how customized
> > > > watermark generation looks like. It mixes business logic with
> technical
> > > > implementation and technical data processing mode. The value of the
> > > > watermark lag threshold must be set very carefully. If the value is
> too
> > > > small. any time, when the watermark generation logic is
> changed(business
> > > > logic changes lead to the threshold getting exceeded), the same job
> might
> > > > be running surprisingly in backlog processing mode, i.e. a butterfly
> > > > effect. A comprehensive documentation is required to avoid any
> confusion
> > > > for the users.
> > > > 2. Like Jark already mentioned, use cases that do not have
> watermarks,
> > > > like pure processing-time based stream processing[1] are not
> covered. It
> > > is
> > > > more or less a trade-off solution that does not support such use
> cases
> > > and
> > > > appropriate documentation is required. Forcing them to explicitly
> > > generate
> > > > watermarks that are never needed just because of this does not sound
> > > like a
> > > > proper solution.
> > > > 3. If I am not mistaken, it only works for use cases where event
> times
> > > are
> > > > very close to the processing times, because the wall clock is used to
> > > > calculate the watermark lag and the watermark is generated based on
> the
> > > > event time.
> > > >
> > > > Best regards,
> > > > Jing
> > > >
> > > > [1]
> > > >
> > >
> https://github.com/apache/flink/blob/2c50b4e956305426f478b726d4de4a640a16b810/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkStrategy.java#L236
> > > >
> > > > On Wed, Aug 30, 2023 at 4:06 AM Xuannan Su <suxuanna...@gmail.com>
> > > wrote:
> > > >
> > > >> Hi Jing,
> > > >>
> > > >> Thank you for the suggestion.
> > > >>
> > > >> The definition of watermark lag is the same as the watermarkLag
> metric
> > > in
> > > >> FLIP-33[1]. More specifically, the watermark lag calculation is
> > > computed at
> > > >> the time when a watermark is emitted downstream in the following
> way:
> > > >> watermarkLag = CurrentTime - Watermark. I have added this
> description to
> > > >> the FLIP.
> > > >>
> > > >> I hope this addresses your concern.
> > > >>
> > > >> Best,
> > > >> Xuannan
> > > >>
> > > >> [1]
> > > >>
> > >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics
> > > >>
> > > >>
> > > >>> On Aug 28, 2023, at 01:04, Jing Ge <j...@ververica.com.INVALID>
> wrote:
> > > >>>
> > > >>> Hi Xuannan,
> > > >>>
> > > >>> Thanks for the proposal. +1 for me.
> > > >>>
> > > >>> There is one tiny thing that I am not sure if I understand it
> > > correctly.
> > > >>> Since there will be many different WatermarkStrategies and
> different
> > > >>> WatermarkGenerators. Could you please update the FLIP and add the
> > > >>> description of how the watermark lag is calculated exactly? E.g.
> > > >> Watermark
> > > >>> lag = A - B with A is the timestamp of the watermark emitted to the
> > > >>> downstream and B is....(this is the part I am not really sure after
> > > >> reading
> > > >>> the FLIP).
> > > >>>
> > > >>> Best regards,
> > > >>> Jing
> > > >>>
> > > >>>
> > > >>> On Mon, Aug 21, 2023 at 9:03 AM Xuannan Su <suxuanna...@gmail.com>
> > > >> wrote:
> > > >>>
> > > >>>> Hi Jark,
> > > >>>>
> > > >>>> Thanks for the comments.
> > > >>>>
> > > >>>> I agree that the current solution cannot support jobs that cannot
> > > define
> > > >>>> watermarks. However, after considering the pending-record-based
> > > >> solution, I
> > > >>>> believe the current solution is superior for the target use case
> as it
> > > >> is
> > > >>>> more intuitive for users. The backlog status gives users the
> ability
> > > to
> > > >>>> balance between throughput and latency. Making this trade-off
> decision
> > > >>>> based on the watermark lag is more intuitive from the user's
> > > >> perspective.
> > > >>>> For instance, a user can decide that if the job lags behind the
> > > current
> > > >>>> time by more than 1 hour, the result is not usable. In that case,
> we
> > > can
> > > >>>> optimize for throughput when the data lags behind by more than an
> > > hour.
> > > >>>> With the pending-record-based solution, it's challenging for
> users to
> > > >>>> determine when to optimize for throughput and when to prioritize
> > > >> latency.
> > > >>>>
> > > >>>> Regarding the limitations of the watermark-based solution:
> > > >>>>
> > > >>>> 1. The current solution can support jobs with sources that have
> event
> > > >>>> time. Users can always define a watermark at the source operator,
> even
> > > >> if
> > > >>>> it's not used by downstream operators, such as streaming join and
> > > >> unbounded
> > > >>>> aggregate.
> > > >>>>
> > > >>>> 2.I don't believe it's accurate to say that the watermark lag will
> > > keep
> > > >>>> increasing if no data is generated in Kafka. The watermark lag and
> > > >> backlog
> > > >>>> status are determined at the moment when the watermark is emitted
> to
> > > the
> > > >>>> downstream operator. If no data is emitted from the source, the
> > > >> watermark
> > > >>>> lag and backlog status will not be updated. If the
> WatermarkStrategy
> > > >> with
> > > >>>> idleness is used, the source becomes non-backlog when it becomes
> idle.
> > > >>>>
> > > >>>> 3. I think watermark lag is more intuitive to determine if a job
> is
> > > >>>> processing backlog data. Even when using pending records, it
> faces a
> > > >>>> similar issue. For example, if the source has 1K pending records,
> > > those
> > > >>>> records can span from 1 day  to 1 hour to 1 second. If the records
> > > span
> > > >> 1
> > > >>>> day, it's probably best to optimize for throughput. If they span 1
> > > >> hour, it
> > > >>>> depends on the business logic. If they span 1 second, optimizing
> for
> > > >>>> latency is likely the better choice.
> > > >>>>
> > > >>>> In summary, I believe the watermark-based solution is a superior
> > > choice
> > > >>>> for the target use case where watermark/event time can be defined.
> > > >>>> Additionally, I haven't come across a scenario that requires
> > > low-latency
> > > >>>> processing and reads from a source that cannot define watermarks.
> If
> > > we
> > > >>>> encounter such a use case, we can create another FLIP to address
> those
> > > >>>> needs in the future. What do you think?
> > > >>>>
> > > >>>>
> > > >>>> Best,
> > > >>>> Xuannan
> > > >>>>
> > > >>>>
> > > >>>>
> > > >>>>> On Aug 20, 2023, at 23:27, Jark Wu <imj...@gmail.com <mailto:
> > > >>>> imj...@gmail.com>> wrote:
> > > >>>>>
> > > >>>>> Hi Xuannan,
> > > >>>>>
> > > >>>>> Thanks for opening this discussion.
> > > >>>>>
> > > >>>>> This current proposal may work in the mentioned watermark cases.
> > > >>>>> However, it seems this is not a general solution for sources to
> > > >> determine
> > > >>>>> "isProcessingBacklog".
> > > >>>>> From my point of view, there are 3 limitations of the current
> > > proposal:
> > > >>>>> 1. It doesn't cover jobs that don't have watermark/event-time
> > > defined,
> > > >>>>> for example streaming join and unbounded aggregate. We may still
> need
> > > >> to
> > > >>>>> figure out solutions for them.
> > > >>>>> 2. Watermark lag can not be trusted, because it increases
> unlimited
> > > if
> > > >> no
> > > >>>>> data is generated in the Kafka.
> > > >>>>> But in this case, there is no backlog at all.
> > > >>>>> 3. Watermark lag is hard to reflect the amount of backlog. If the
> > > >>>> watermark
> > > >>>>> lag is 1day or 1 hour or 1second,
> > > >>>>> there is possibly only 1 pending record there, which means no
> backlog
> > > >> at
> > > >>>>> all.
> > > >>>>>
> > > >>>>> Therefore, IMO, watermark maybe not the ideal metric used to
> > > determine
> > > >>>>> "isProcessingBacklog".
> > > >>>>> What we need is something that reflects the number of records
> > > >> unprocessed
> > > >>>>> by the job.
> > > >>>>> Actually, that is the "pendingRecords" metric proposed in
> FLIP-33 and
> > > >> has
> > > >>>>> been implemented by Kafka source.
> > > >>>>> Did you consider using "pendingRecords" metric to determine
> > > >>>>> "isProcessingBacklog"?
> > > >>>>>
> > > >>>>> Best,
> > > >>>>> Jark
> > > >>>>>
> > > >>>>>
> > > >>>>> [1]
> > > >>>>>
> > > >>>>
> > > >>
> > >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics
> > > >>>> <
> > > >>>>
> > > >>
> > >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics
> > > >>>>>
> > > >>>>>
> > > >>>>>
> > > >>>>>
> > > >>>>> On Tue, 15 Aug 2023 at 12:04, Xintong Song <
> tonysong...@gmail.com
> > > >>>> <mailto:tonysong...@gmail.com>> wrote:
> > > >>>>>
> > > >>>>>> Sounds good to me.
> > > >>>>>>
> > > >>>>>> It is true that, if we are introducing the generalized
> watermark,
> > > >> there
> > > >>>>>> will be other watermark related concepts / configurations that
> need
> > > to
> > > >>>> be
> > > >>>>>> updated anyway.
> > > >>>>>>
> > > >>>>>>
> > > >>>>>> Best,
> > > >>>>>>
> > > >>>>>> Xintong
> > > >>>>>>
> > > >>>>>>
> > > >>>>>>
> > > >>>>>> On Tue, Aug 15, 2023 at 11:30 AM Xuannan Su <
> suxuanna...@gmail.com
> > > >>>> <mailto:suxuanna...@gmail.com>> wrote:
> > > >>>>>>
> > > >>>>>>> Hi Xingtong,
> > > >>>>>>>
> > > >>>>>>> Thank you for your suggestion.
> > > >>>>>>>
> > > >>>>>>> After considering the idea of using a general configuration
> key, I
> > > >>>> think
> > > >>>>>>> it may not be a good idea for the reasons below.
> > > >>>>>>>
> > > >>>>>>> While I agree that using a more general configuration key
> provides
> > > us
> > > >>>>>> with
> > > >>>>>>> the flexibility to switch to other approaches to calculate the
> lag
> > > in
> > > >>>> the
> > > >>>>>>> future, the downside is that it may cause confusion for users.
> We
> > > >>>>>> currently
> > > >>>>>>> have fetchEventTimeLag, emitEventTimeLag, and watermarkLag in
> the
> > > >>>> source,
> > > >>>>>>> and it is not clear which specific lag we are referring to.
> With
> > > the
> > > >>>>>>> potential introduction of the Generalized Watermark mechanism
> in
> > > the
> > > >>>>>>> future, if I understand correctly, a watermark won't
> necessarily
> > > need
> > > >>>> to
> > > >>>>>> be
> > > >>>>>>> a timestamp. I am concern that the general configuration key
> may
> > > not
> > > >>>> be
> > > >>>>>>> enough to cover all the use case and we will need to introduce
> a
> > > >>>> general
> > > >>>>>>> way to determine the backlog status regardless.
> > > >>>>>>>
> > > >>>>>>> For the reasons above, I prefer introducing the configuration
> as
> > > is,
> > > >>>> and
> > > >>>>>>> change it later with the a deprecation process or migration
> > > process.
> > > >>>> What
> > > >>>>>>> do you think?
> > > >>>>>>>
> > > >>>>>>> Best,
> > > >>>>>>> Xuannan
> > > >>>>>>> On Aug 14, 2023, 14:09 +0800, Xintong Song <
> tonysong...@gmail.com
> > > >>>> <mailto:tonysong...@gmail.com>>,
> > > >>>>>> wrote:
> > > >>>>>>>> Thanks for the explanation.
> > > >>>>>>>>
> > > >>>>>>>> I wonder if it makes sense to not expose this detail via the
> > > >>>>>>> configuration
> > > >>>>>>>> option. To be specific, I suggest not mentioning the
> "watermark"
> > > >>>>>> keyword
> > > >>>>>>> in
> > > >>>>>>>> the configuration key and description.
> > > >>>>>>>>
> > > >>>>>>>> - From the users' perspective, I think they only need to know
> > > >> there's
> > > >>>> a
> > > >>>>>>>> lag higher than the given threshold, Flink will consider
> latency
> > > of
> > > >>>>>>>> individual records as less important and prioritize throughput
> > > over
> > > >>>> it.
> > > >>>>>>>> They don't really need the details of how the lags are
> calculated.
> > > >>>>>>>> - For the internal implementation, I also think using
> watermark
> > > lags
> > > >>>> is
> > > >>>>>>>> a good idea, for the reasons you've already mentioned.
> However,
> > > it's
> > > >>>>>> not
> > > >>>>>>>> the only possible option. Hiding this detail from users would
> give
> > > >> us
> > > >>>>>> the
> > > >>>>>>>> flexibility to switch to other approaches if needed in future.
> > > >>>>>>>> - We are currently working on designing the ProcessFunction
> API
> > > >>>>>>>> (consider it as a DataStream API V2). There's an idea to
> > > introduce a
> > > >>>>>>>> Generalized Watermark mechanism, where basically the
> watermark can
> > > >> be
> > > >>>>>>>> anything that needs to travel along the data-flow with certain
> > > >>>>>> alignment
> > > >>>>>>>> strategies, and event time watermark would be one specific
> case of
> > > >> it.
> > > >>>>>>> This
> > > >>>>>>>> is still an idea and has not been discussed and agreed on by
> the
> > > >>>>>>> community,
> > > >>>>>>>> and we are preparing a FLIP for it. But if we are going for
> it,
> > > the
> > > >>>>>>> concept
> > > >>>>>>>> "watermark-lag-threshold" could be ambiguous.
> > > >>>>>>>>
> > > >>>>>>>> I do not intend to block the FLIP on this. I'd also be fine
> with
> > > >>>>>>>> introducing the configuration as is, and changing it later, if
> > > >> needed,
> > > >>>>>>> with
> > > >>>>>>>> a regular deprecation and migration process. Just making my
> > > >>>>>> suggestions.
> > > >>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>> Best,
> > > >>>>>>>>
> > > >>>>>>>> Xintong
> > > >>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>> On Mon, Aug 14, 2023 at 12:00 PM Xuannan Su <
> > > suxuanna...@gmail.com
> > > >>>> <mailto:suxuanna...@gmail.com>>
> > > >>>>>>> wrote:
> > > >>>>>>>>
> > > >>>>>>>>> Hi Xintong,
> > > >>>>>>>>>
> > > >>>>>>>>> Thanks for the reply.
> > > >>>>>>>>>
> > > >>>>>>>>> I have considered using the timestamp in the records to
> determine
> > > >> the
> > > >>>>>>>>> backlog status, and decided to use watermark at the end. By
> > > >>>>>> definition,
> > > >>>>>>>>> watermark is the time progress indication in the data
> stream. It
> > > >>>>>>> indicates
> > > >>>>>>>>> the stream’s event time has progressed to some specific
> time. On
> > > >> the
> > > >>>>>>> other
> > > >>>>>>>>> hand, timestamp in the records is usually used to generate
> the
> > > >>>>>>> watermark.
> > > >>>>>>>>> Therefore, it appears more appropriate and intuitive to
> calculate
> > > >> the
> > > >>>>>>> event
> > > >>>>>>>>> time lag by watermark and determine the backlog status. And
> by
> > > >> using
> > > >>>>>>> the
> > > >>>>>>>>> watermark, we can easily deal with the out-of-order and the
> > > >> idleness
> > > >>>>>>> of the
> > > >>>>>>>>> data.
> > > >>>>>>>>>
> > > >>>>>>>>> Please let me know if you have further questions.
> > > >>>>>>>>>
> > > >>>>>>>>> Best,
> > > >>>>>>>>> Xuannan
> > > >>>>>>>>> On Aug 10, 2023, 20:23 +0800, Xintong Song <
> > > tonysong...@gmail.com
> > > >>>> <mailto:tonysong...@gmail.com>>,
> > > >>>>>>> wrote:
> > > >>>>>>>>>> Thanks for preparing the FLIP, Xuannan.
> > > >>>>>>>>>>
> > > >>>>>>>>>> +1 in general.
> > > >>>>>>>>>>
> > > >>>>>>>>>> A quick question, could you explain why we are relying on
> the
> > > >>>>>>> watermark
> > > >>>>>>>>> for
> > > >>>>>>>>>> emitting the record attribute? Why not use timestamps in the
> > > >>>>>>> records? I
> > > >>>>>>>>>> don't see any concern in using watermarks. Just wondering if
> > > >>>>>> there's
> > > >>>>>>> any
> > > >>>>>>>>>> deep considerations behind this.
> > > >>>>>>>>>>
> > > >>>>>>>>>> Best,
> > > >>>>>>>>>>
> > > >>>>>>>>>> Xintong
> > > >>>>>>>>>>
> > > >>>>>>>>>>
> > > >>>>>>>>>>
> > > >>>>>>>>>> On Thu, Aug 3, 2023 at 3:03 PM Xuannan Su <
> > > suxuanna...@gmail.com
> > > >>>> <mailto:suxuanna...@gmail.com>>
> > > >>>>>>> wrote:
> > > >>>>>>>>>>
> > > >>>>>>>>>>> Hi all,
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> I am opening this thread to discuss FLIP-328: Allow source
> > > >>>>>>> operators to
> > > >>>>>>>>>>> determine isProcessingBacklog based on watermark lag[1]. We
> > > had a
> > > >>>>>>>>> several
> > > >>>>>>>>>>> discussions with Dong Ling about the design, and thanks
> for all
> > > >>>>>> the
> > > >>>>>>>>>>> valuable advice.
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> The FLIP aims to target the use-case where user want to
> run a
> > > >>>>>> Flink
> > > >>>>>>>>> job to
> > > >>>>>>>>>>> backfill historical data in a high throughput manner and
> > > continue
> > > >>>>>>>>>>> processing real-time data with low latency. Building upon
> the
> > > >>>>>>> backlog
> > > >>>>>>>>>>> concept introduced in FLIP-309[2], this proposal enables
> > > sources
> > > >>>>>> to
> > > >>>>>>>>> report
> > > >>>>>>>>>>> their status of processing backlog based on the watermark
> lag.
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> We would greatly appreciate any comments or feedback you
> may
> > > have
> > > >>>>>>> on
> > > >>>>>>>>> this
> > > >>>>>>>>>>> proposal.
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> Best,
> > > >>>>>>>>>>> Xuannan
> > > >>>>>>>>>>>
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> [1]
> > > >>>>>>>>>>>
> > > >>>>>>>>>
> > > >>>>>>>
> > > >>>>>>
> > > >>>>
> > > >>
> > >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-328%3A+Allow+source+operators+to+determine+isProcessingBacklog+based+on+watermark+lag
> > > >>>> <
> > > >>>>
> > > >>
> > >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-328%3A+Allow+source+operators+to+determine+isProcessingBacklog+based+on+watermark+lag
> > > >>>>>
> > > >>>>>>>>>>> [2]
> > > >>>>>>>>>>>
> > > >>>>>>>>>
> > > >>>>>>>
> > > >>>>>>
> > > >>>>
> > > >>
> > >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-309%3A+Support+using+larger+checkpointing+interval+when+source+is+processing+backlog
> > > >>>> <
> > > >>>>
> > > >>
> > >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-309%3A+Support+using+larger+checkpointing+interval+when+source+is+processing+backlog
> > > >>
> > > >>
> > > >>
> > >
> > >
>

Reply via email to