Hi Hang,

Thanks for the review.

This is a good question. It appears that this particular piece of
information is absent from the FLIP document. If I understand
correctly, the term "From the source" refers to a source that
determines the backlog status based on its state. Both the backlog
statuses are determined at the source operator, based on watermark lag
and based on source state. Only when both backlog statuses are false
does the source operator send out the signal "isBacklog=false"
downstream. Essentially, the backlog statuses are combined using a
logical OR operation. I updated the configuration description to
include such behavior.

I hope that addresses your question.

Best,
Xuannan


On Wed, Aug 30, 2023 at 4:31 PM Hang Ruan <ruanhang1...@gmail.com> wrote:
>
> Hi, Xuannan.
>
> Thanks for preparing the FLIP.
>
> After this FLIP, we will have two ways to report isProcessingBacklog: 1.
> From the source; 2. Judged by the watermark lag. What is the priority
> between them?
> For example, what is the status isProcessingBacklog when the source report
> `isProcessingBacklog=false` and the watermark lag exceeds the threshold?
>
> Best,
> Hang
>
> Xuannan Su <suxuanna...@gmail.com> 于2023年8月30日周三 10:06写道:
>
> > Hi Jing,
> >
> > Thank you for the suggestion.
> >
> > The definition of watermark lag is the same as the watermarkLag metric in
> > FLIP-33[1]. More specifically, the watermark lag calculation is computed at
> > the time when a watermark is emitted downstream in the following way:
> > watermarkLag = CurrentTime - Watermark. I have added this description to
> > the FLIP.
> >
> > I hope this addresses your concern.
> >
> > Best,
> > Xuannan
> >
> > [1]
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics
> >
> >
> > > On Aug 28, 2023, at 01:04, Jing Ge <j...@ververica.com.INVALID> wrote:
> > >
> > > Hi Xuannan,
> > >
> > > Thanks for the proposal. +1 for me.
> > >
> > > There is one tiny thing that I am not sure if I understand it correctly.
> > > Since there will be many different WatermarkStrategies and different
> > > WatermarkGenerators. Could you please update the FLIP and add the
> > > description of how the watermark lag is calculated exactly? E.g.
> > Watermark
> > > lag = A - B with A is the timestamp of the watermark emitted to the
> > > downstream and B is....(this is the part I am not really sure after
> > reading
> > > the FLIP).
> > >
> > > Best regards,
> > > Jing
> > >
> > >
> > > On Mon, Aug 21, 2023 at 9:03 AM Xuannan Su <suxuanna...@gmail.com>
> > wrote:
> > >
> > >> Hi Jark,
> > >>
> > >> Thanks for the comments.
> > >>
> > >> I agree that the current solution cannot support jobs that cannot define
> > >> watermarks. However, after considering the pending-record-based
> > solution, I
> > >> believe the current solution is superior for the target use case as it
> > is
> > >> more intuitive for users. The backlog status gives users the ability to
> > >> balance between throughput and latency. Making this trade-off decision
> > >> based on the watermark lag is more intuitive from the user's
> > perspective.
> > >> For instance, a user can decide that if the job lags behind the current
> > >> time by more than 1 hour, the result is not usable. In that case, we can
> > >> optimize for throughput when the data lags behind by more than an hour.
> > >> With the pending-record-based solution, it's challenging for users to
> > >> determine when to optimize for throughput and when to prioritize
> > latency.
> > >>
> > >> Regarding the limitations of the watermark-based solution:
> > >>
> > >> 1. The current solution can support jobs with sources that have event
> > >> time. Users can always define a watermark at the source operator, even
> > if
> > >> it's not used by downstream operators, such as streaming join and
> > unbounded
> > >> aggregate.
> > >>
> > >> 2.I don't believe it's accurate to say that the watermark lag will keep
> > >> increasing if no data is generated in Kafka. The watermark lag and
> > backlog
> > >> status are determined at the moment when the watermark is emitted to the
> > >> downstream operator. If no data is emitted from the source, the
> > watermark
> > >> lag and backlog status will not be updated. If the WatermarkStrategy
> > with
> > >> idleness is used, the source becomes non-backlog when it becomes idle.
> > >>
> > >> 3. I think watermark lag is more intuitive to determine if a job is
> > >> processing backlog data. Even when using pending records, it faces a
> > >> similar issue. For example, if the source has 1K pending records, those
> > >> records can span from 1 day  to 1 hour to 1 second. If the records span
> > 1
> > >> day, it's probably best to optimize for throughput. If they span 1
> > hour, it
> > >> depends on the business logic. If they span 1 second, optimizing for
> > >> latency is likely the better choice.
> > >>
> > >> In summary, I believe the watermark-based solution is a superior choice
> > >> for the target use case where watermark/event time can be defined.
> > >> Additionally, I haven't come across a scenario that requires low-latency
> > >> processing and reads from a source that cannot define watermarks. If we
> > >> encounter such a use case, we can create another FLIP to address those
> > >> needs in the future. What do you think?
> > >>
> > >>
> > >> Best,
> > >> Xuannan
> > >>
> > >>
> > >>
> > >>> On Aug 20, 2023, at 23:27, Jark Wu <imj...@gmail.com <mailto:
> > >> imj...@gmail.com>> wrote:
> > >>>
> > >>> Hi Xuannan,
> > >>>
> > >>> Thanks for opening this discussion.
> > >>>
> > >>> This current proposal may work in the mentioned watermark cases.
> > >>> However, it seems this is not a general solution for sources to
> > determine
> > >>> "isProcessingBacklog".
> > >>> From my point of view, there are 3 limitations of the current proposal:
> > >>> 1. It doesn't cover jobs that don't have watermark/event-time defined,
> > >>> for example streaming join and unbounded aggregate. We may still need
> > to
> > >>> figure out solutions for them.
> > >>> 2. Watermark lag can not be trusted, because it increases unlimited if
> > no
> > >>> data is generated in the Kafka.
> > >>> But in this case, there is no backlog at all.
> > >>> 3. Watermark lag is hard to reflect the amount of backlog. If the
> > >> watermark
> > >>> lag is 1day or 1 hour or 1second,
> > >>> there is possibly only 1 pending record there, which means no backlog
> > at
> > >>> all.
> > >>>
> > >>> Therefore, IMO, watermark maybe not the ideal metric used to determine
> > >>> "isProcessingBacklog".
> > >>> What we need is something that reflects the number of records
> > unprocessed
> > >>> by the job.
> > >>> Actually, that is the "pendingRecords" metric proposed in FLIP-33 and
> > has
> > >>> been implemented by Kafka source.
> > >>> Did you consider using "pendingRecords" metric to determine
> > >>> "isProcessingBacklog"?
> > >>>
> > >>> Best,
> > >>> Jark
> > >>>
> > >>>
> > >>> [1]
> > >>>
> > >>
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics
> > >> <
> > >>
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics
> > >>>
> > >>>
> > >>>
> > >>>
> > >>> On Tue, 15 Aug 2023 at 12:04, Xintong Song <tonysong...@gmail.com
> > >> <mailto:tonysong...@gmail.com>> wrote:
> > >>>
> > >>>> Sounds good to me.
> > >>>>
> > >>>> It is true that, if we are introducing the generalized watermark,
> > there
> > >>>> will be other watermark related concepts / configurations that need to
> > >> be
> > >>>> updated anyway.
> > >>>>
> > >>>>
> > >>>> Best,
> > >>>>
> > >>>> Xintong
> > >>>>
> > >>>>
> > >>>>
> > >>>> On Tue, Aug 15, 2023 at 11:30 AM Xuannan Su <suxuanna...@gmail.com
> > >> <mailto:suxuanna...@gmail.com>> wrote:
> > >>>>
> > >>>>> Hi Xingtong,
> > >>>>>
> > >>>>> Thank you for your suggestion.
> > >>>>>
> > >>>>> After considering the idea of using a general configuration key, I
> > >> think
> > >>>>> it may not be a good idea for the reasons below.
> > >>>>>
> > >>>>> While I agree that using a more general configuration key provides us
> > >>>> with
> > >>>>> the flexibility to switch to other approaches to calculate the lag in
> > >> the
> > >>>>> future, the downside is that it may cause confusion for users. We
> > >>>> currently
> > >>>>> have fetchEventTimeLag, emitEventTimeLag, and watermarkLag in the
> > >> source,
> > >>>>> and it is not clear which specific lag we are referring to. With the
> > >>>>> potential introduction of the Generalized Watermark mechanism in the
> > >>>>> future, if I understand correctly, a watermark won't necessarily need
> > >> to
> > >>>> be
> > >>>>> a timestamp. I am concern that the general configuration key may not
> > >> be
> > >>>>> enough to cover all the use case and we will need to introduce a
> > >> general
> > >>>>> way to determine the backlog status regardless.
> > >>>>>
> > >>>>> For the reasons above, I prefer introducing the configuration as is,
> > >> and
> > >>>>> change it later with the a deprecation process or migration process.
> > >> What
> > >>>>> do you think?
> > >>>>>
> > >>>>> Best,
> > >>>>> Xuannan
> > >>>>> On Aug 14, 2023, 14:09 +0800, Xintong Song <tonysong...@gmail.com
> > >> <mailto:tonysong...@gmail.com>>,
> > >>>> wrote:
> > >>>>>> Thanks for the explanation.
> > >>>>>>
> > >>>>>> I wonder if it makes sense to not expose this detail via the
> > >>>>> configuration
> > >>>>>> option. To be specific, I suggest not mentioning the "watermark"
> > >>>> keyword
> > >>>>> in
> > >>>>>> the configuration key and description.
> > >>>>>>
> > >>>>>> - From the users' perspective, I think they only need to know
> > there's
> > >> a
> > >>>>>> lag higher than the given threshold, Flink will consider latency of
> > >>>>>> individual records as less important and prioritize throughput over
> > >> it.
> > >>>>>> They don't really need the details of how the lags are calculated.
> > >>>>>> - For the internal implementation, I also think using watermark lags
> > >> is
> > >>>>>> a good idea, for the reasons you've already mentioned. However, it's
> > >>>> not
> > >>>>>> the only possible option. Hiding this detail from users would give
> > us
> > >>>> the
> > >>>>>> flexibility to switch to other approaches if needed in future.
> > >>>>>> - We are currently working on designing the ProcessFunction API
> > >>>>>> (consider it as a DataStream API V2). There's an idea to introduce a
> > >>>>>> Generalized Watermark mechanism, where basically the watermark can
> > be
> > >>>>>> anything that needs to travel along the data-flow with certain
> > >>>> alignment
> > >>>>>> strategies, and event time watermark would be one specific case of
> > it.
> > >>>>> This
> > >>>>>> is still an idea and has not been discussed and agreed on by the
> > >>>>> community,
> > >>>>>> and we are preparing a FLIP for it. But if we are going for it, the
> > >>>>> concept
> > >>>>>> "watermark-lag-threshold" could be ambiguous.
> > >>>>>>
> > >>>>>> I do not intend to block the FLIP on this. I'd also be fine with
> > >>>>>> introducing the configuration as is, and changing it later, if
> > needed,
> > >>>>> with
> > >>>>>> a regular deprecation and migration process. Just making my
> > >>>> suggestions.
> > >>>>>>
> > >>>>>>
> > >>>>>> Best,
> > >>>>>>
> > >>>>>> Xintong
> > >>>>>>
> > >>>>>>
> > >>>>>>
> > >>>>>> On Mon, Aug 14, 2023 at 12:00 PM Xuannan Su <suxuanna...@gmail.com
> > >> <mailto:suxuanna...@gmail.com>>
> > >>>>> wrote:
> > >>>>>>
> > >>>>>>> Hi Xintong,
> > >>>>>>>
> > >>>>>>> Thanks for the reply.
> > >>>>>>>
> > >>>>>>> I have considered using the timestamp in the records to determine
> > the
> > >>>>>>> backlog status, and decided to use watermark at the end. By
> > >>>> definition,
> > >>>>>>> watermark is the time progress indication in the data stream. It
> > >>>>> indicates
> > >>>>>>> the stream’s event time has progressed to some specific time. On
> > the
> > >>>>> other
> > >>>>>>> hand, timestamp in the records is usually used to generate the
> > >>>>> watermark.
> > >>>>>>> Therefore, it appears more appropriate and intuitive to calculate
> > the
> > >>>>> event
> > >>>>>>> time lag by watermark and determine the backlog status. And by
> > using
> > >>>>> the
> > >>>>>>> watermark, we can easily deal with the out-of-order and the
> > idleness
> > >>>>> of the
> > >>>>>>> data.
> > >>>>>>>
> > >>>>>>> Please let me know if you have further questions.
> > >>>>>>>
> > >>>>>>> Best,
> > >>>>>>> Xuannan
> > >>>>>>> On Aug 10, 2023, 20:23 +0800, Xintong Song <tonysong...@gmail.com
> > >> <mailto:tonysong...@gmail.com>>,
> > >>>>> wrote:
> > >>>>>>>> Thanks for preparing the FLIP, Xuannan.
> > >>>>>>>>
> > >>>>>>>> +1 in general.
> > >>>>>>>>
> > >>>>>>>> A quick question, could you explain why we are relying on the
> > >>>>> watermark
> > >>>>>>> for
> > >>>>>>>> emitting the record attribute? Why not use timestamps in the
> > >>>>> records? I
> > >>>>>>>> don't see any concern in using watermarks. Just wondering if
> > >>>> there's
> > >>>>> any
> > >>>>>>>> deep considerations behind this.
> > >>>>>>>>
> > >>>>>>>> Best,
> > >>>>>>>>
> > >>>>>>>> Xintong
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>> On Thu, Aug 3, 2023 at 3:03 PM Xuannan Su <suxuanna...@gmail.com
> > >> <mailto:suxuanna...@gmail.com>>
> > >>>>> wrote:
> > >>>>>>>>
> > >>>>>>>>> Hi all,
> > >>>>>>>>>
> > >>>>>>>>> I am opening this thread to discuss FLIP-328: Allow source
> > >>>>> operators to
> > >>>>>>>>> determine isProcessingBacklog based on watermark lag[1]. We had a
> > >>>>>>> several
> > >>>>>>>>> discussions with Dong Ling about the design, and thanks for all
> > >>>> the
> > >>>>>>>>> valuable advice.
> > >>>>>>>>>
> > >>>>>>>>> The FLIP aims to target the use-case where user want to run a
> > >>>> Flink
> > >>>>>>> job to
> > >>>>>>>>> backfill historical data in a high throughput manner and continue
> > >>>>>>>>> processing real-time data with low latency. Building upon the
> > >>>>> backlog
> > >>>>>>>>> concept introduced in FLIP-309[2], this proposal enables sources
> > >>>> to
> > >>>>>>> report
> > >>>>>>>>> their status of processing backlog based on the watermark lag.
> > >>>>>>>>>
> > >>>>>>>>> We would greatly appreciate any comments or feedback you may have
> > >>>>> on
> > >>>>>>> this
> > >>>>>>>>> proposal.
> > >>>>>>>>>
> > >>>>>>>>> Best,
> > >>>>>>>>> Xuannan
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>> [1]
> > >>>>>>>>>
> > >>>>>>>
> > >>>>>
> > >>>>
> > >>
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-328%3A+Allow+source+operators+to+determine+isProcessingBacklog+based+on+watermark+lag
> > >> <
> > >>
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-328%3A+Allow+source+operators+to+determine+isProcessingBacklog+based+on+watermark+lag
> > >>>
> > >>>>>>>>> [2]
> > >>>>>>>>>
> > >>>>>>>
> > >>>>>
> > >>>>
> > >>
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-309%3A+Support+using+larger+checkpointing+interval+when+source+is+processing+backlog
> > >> <
> > >>
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-309%3A+Support+using+larger+checkpointing+interval+when+source+is+processing+backlog
> >
> >
> >

Reply via email to