Hi, Xuannan.

Thanks for preparing the FLIP.

After this FLIP, we will have two ways to report isProcessingBacklog: 1.
>From the source; 2. Judged by the watermark lag. What is the priority
between them?
For example, what is the status isProcessingBacklog when the source report
`isProcessingBacklog=false` and the watermark lag exceeds the threshold?

Best,
Hang

Xuannan Su <suxuanna...@gmail.com> 于2023年8月30日周三 10:06写道:

> Hi Jing,
>
> Thank you for the suggestion.
>
> The definition of watermark lag is the same as the watermarkLag metric in
> FLIP-33[1]. More specifically, the watermark lag calculation is computed at
> the time when a watermark is emitted downstream in the following way:
> watermarkLag = CurrentTime - Watermark. I have added this description to
> the FLIP.
>
> I hope this addresses your concern.
>
> Best,
> Xuannan
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics
>
>
> > On Aug 28, 2023, at 01:04, Jing Ge <j...@ververica.com.INVALID> wrote:
> >
> > Hi Xuannan,
> >
> > Thanks for the proposal. +1 for me.
> >
> > There is one tiny thing that I am not sure if I understand it correctly.
> > Since there will be many different WatermarkStrategies and different
> > WatermarkGenerators. Could you please update the FLIP and add the
> > description of how the watermark lag is calculated exactly? E.g.
> Watermark
> > lag = A - B with A is the timestamp of the watermark emitted to the
> > downstream and B is....(this is the part I am not really sure after
> reading
> > the FLIP).
> >
> > Best regards,
> > Jing
> >
> >
> > On Mon, Aug 21, 2023 at 9:03 AM Xuannan Su <suxuanna...@gmail.com>
> wrote:
> >
> >> Hi Jark,
> >>
> >> Thanks for the comments.
> >>
> >> I agree that the current solution cannot support jobs that cannot define
> >> watermarks. However, after considering the pending-record-based
> solution, I
> >> believe the current solution is superior for the target use case as it
> is
> >> more intuitive for users. The backlog status gives users the ability to
> >> balance between throughput and latency. Making this trade-off decision
> >> based on the watermark lag is more intuitive from the user's
> perspective.
> >> For instance, a user can decide that if the job lags behind the current
> >> time by more than 1 hour, the result is not usable. In that case, we can
> >> optimize for throughput when the data lags behind by more than an hour.
> >> With the pending-record-based solution, it's challenging for users to
> >> determine when to optimize for throughput and when to prioritize
> latency.
> >>
> >> Regarding the limitations of the watermark-based solution:
> >>
> >> 1. The current solution can support jobs with sources that have event
> >> time. Users can always define a watermark at the source operator, even
> if
> >> it's not used by downstream operators, such as streaming join and
> unbounded
> >> aggregate.
> >>
> >> 2.I don't believe it's accurate to say that the watermark lag will keep
> >> increasing if no data is generated in Kafka. The watermark lag and
> backlog
> >> status are determined at the moment when the watermark is emitted to the
> >> downstream operator. If no data is emitted from the source, the
> watermark
> >> lag and backlog status will not be updated. If the WatermarkStrategy
> with
> >> idleness is used, the source becomes non-backlog when it becomes idle.
> >>
> >> 3. I think watermark lag is more intuitive to determine if a job is
> >> processing backlog data. Even when using pending records, it faces a
> >> similar issue. For example, if the source has 1K pending records, those
> >> records can span from 1 day  to 1 hour to 1 second. If the records span
> 1
> >> day, it's probably best to optimize for throughput. If they span 1
> hour, it
> >> depends on the business logic. If they span 1 second, optimizing for
> >> latency is likely the better choice.
> >>
> >> In summary, I believe the watermark-based solution is a superior choice
> >> for the target use case where watermark/event time can be defined.
> >> Additionally, I haven't come across a scenario that requires low-latency
> >> processing and reads from a source that cannot define watermarks. If we
> >> encounter such a use case, we can create another FLIP to address those
> >> needs in the future. What do you think?
> >>
> >>
> >> Best,
> >> Xuannan
> >>
> >>
> >>
> >>> On Aug 20, 2023, at 23:27, Jark Wu <imj...@gmail.com <mailto:
> >> imj...@gmail.com>> wrote:
> >>>
> >>> Hi Xuannan,
> >>>
> >>> Thanks for opening this discussion.
> >>>
> >>> This current proposal may work in the mentioned watermark cases.
> >>> However, it seems this is not a general solution for sources to
> determine
> >>> "isProcessingBacklog".
> >>> From my point of view, there are 3 limitations of the current proposal:
> >>> 1. It doesn't cover jobs that don't have watermark/event-time defined,
> >>> for example streaming join and unbounded aggregate. We may still need
> to
> >>> figure out solutions for them.
> >>> 2. Watermark lag can not be trusted, because it increases unlimited if
> no
> >>> data is generated in the Kafka.
> >>> But in this case, there is no backlog at all.
> >>> 3. Watermark lag is hard to reflect the amount of backlog. If the
> >> watermark
> >>> lag is 1day or 1 hour or 1second,
> >>> there is possibly only 1 pending record there, which means no backlog
> at
> >>> all.
> >>>
> >>> Therefore, IMO, watermark maybe not the ideal metric used to determine
> >>> "isProcessingBacklog".
> >>> What we need is something that reflects the number of records
> unprocessed
> >>> by the job.
> >>> Actually, that is the "pendingRecords" metric proposed in FLIP-33 and
> has
> >>> been implemented by Kafka source.
> >>> Did you consider using "pendingRecords" metric to determine
> >>> "isProcessingBacklog"?
> >>>
> >>> Best,
> >>> Jark
> >>>
> >>>
> >>> [1]
> >>>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics
> >> <
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics
> >>>
> >>>
> >>>
> >>>
> >>> On Tue, 15 Aug 2023 at 12:04, Xintong Song <tonysong...@gmail.com
> >> <mailto:tonysong...@gmail.com>> wrote:
> >>>
> >>>> Sounds good to me.
> >>>>
> >>>> It is true that, if we are introducing the generalized watermark,
> there
> >>>> will be other watermark related concepts / configurations that need to
> >> be
> >>>> updated anyway.
> >>>>
> >>>>
> >>>> Best,
> >>>>
> >>>> Xintong
> >>>>
> >>>>
> >>>>
> >>>> On Tue, Aug 15, 2023 at 11:30 AM Xuannan Su <suxuanna...@gmail.com
> >> <mailto:suxuanna...@gmail.com>> wrote:
> >>>>
> >>>>> Hi Xingtong,
> >>>>>
> >>>>> Thank you for your suggestion.
> >>>>>
> >>>>> After considering the idea of using a general configuration key, I
> >> think
> >>>>> it may not be a good idea for the reasons below.
> >>>>>
> >>>>> While I agree that using a more general configuration key provides us
> >>>> with
> >>>>> the flexibility to switch to other approaches to calculate the lag in
> >> the
> >>>>> future, the downside is that it may cause confusion for users. We
> >>>> currently
> >>>>> have fetchEventTimeLag, emitEventTimeLag, and watermarkLag in the
> >> source,
> >>>>> and it is not clear which specific lag we are referring to. With the
> >>>>> potential introduction of the Generalized Watermark mechanism in the
> >>>>> future, if I understand correctly, a watermark won't necessarily need
> >> to
> >>>> be
> >>>>> a timestamp. I am concern that the general configuration key may not
> >> be
> >>>>> enough to cover all the use case and we will need to introduce a
> >> general
> >>>>> way to determine the backlog status regardless.
> >>>>>
> >>>>> For the reasons above, I prefer introducing the configuration as is,
> >> and
> >>>>> change it later with the a deprecation process or migration process.
> >> What
> >>>>> do you think?
> >>>>>
> >>>>> Best,
> >>>>> Xuannan
> >>>>> On Aug 14, 2023, 14:09 +0800, Xintong Song <tonysong...@gmail.com
> >> <mailto:tonysong...@gmail.com>>,
> >>>> wrote:
> >>>>>> Thanks for the explanation.
> >>>>>>
> >>>>>> I wonder if it makes sense to not expose this detail via the
> >>>>> configuration
> >>>>>> option. To be specific, I suggest not mentioning the "watermark"
> >>>> keyword
> >>>>> in
> >>>>>> the configuration key and description.
> >>>>>>
> >>>>>> - From the users' perspective, I think they only need to know
> there's
> >> a
> >>>>>> lag higher than the given threshold, Flink will consider latency of
> >>>>>> individual records as less important and prioritize throughput over
> >> it.
> >>>>>> They don't really need the details of how the lags are calculated.
> >>>>>> - For the internal implementation, I also think using watermark lags
> >> is
> >>>>>> a good idea, for the reasons you've already mentioned. However, it's
> >>>> not
> >>>>>> the only possible option. Hiding this detail from users would give
> us
> >>>> the
> >>>>>> flexibility to switch to other approaches if needed in future.
> >>>>>> - We are currently working on designing the ProcessFunction API
> >>>>>> (consider it as a DataStream API V2). There's an idea to introduce a
> >>>>>> Generalized Watermark mechanism, where basically the watermark can
> be
> >>>>>> anything that needs to travel along the data-flow with certain
> >>>> alignment
> >>>>>> strategies, and event time watermark would be one specific case of
> it.
> >>>>> This
> >>>>>> is still an idea and has not been discussed and agreed on by the
> >>>>> community,
> >>>>>> and we are preparing a FLIP for it. But if we are going for it, the
> >>>>> concept
> >>>>>> "watermark-lag-threshold" could be ambiguous.
> >>>>>>
> >>>>>> I do not intend to block the FLIP on this. I'd also be fine with
> >>>>>> introducing the configuration as is, and changing it later, if
> needed,
> >>>>> with
> >>>>>> a regular deprecation and migration process. Just making my
> >>>> suggestions.
> >>>>>>
> >>>>>>
> >>>>>> Best,
> >>>>>>
> >>>>>> Xintong
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> On Mon, Aug 14, 2023 at 12:00 PM Xuannan Su <suxuanna...@gmail.com
> >> <mailto:suxuanna...@gmail.com>>
> >>>>> wrote:
> >>>>>>
> >>>>>>> Hi Xintong,
> >>>>>>>
> >>>>>>> Thanks for the reply.
> >>>>>>>
> >>>>>>> I have considered using the timestamp in the records to determine
> the
> >>>>>>> backlog status, and decided to use watermark at the end. By
> >>>> definition,
> >>>>>>> watermark is the time progress indication in the data stream. It
> >>>>> indicates
> >>>>>>> the stream’s event time has progressed to some specific time. On
> the
> >>>>> other
> >>>>>>> hand, timestamp in the records is usually used to generate the
> >>>>> watermark.
> >>>>>>> Therefore, it appears more appropriate and intuitive to calculate
> the
> >>>>> event
> >>>>>>> time lag by watermark and determine the backlog status. And by
> using
> >>>>> the
> >>>>>>> watermark, we can easily deal with the out-of-order and the
> idleness
> >>>>> of the
> >>>>>>> data.
> >>>>>>>
> >>>>>>> Please let me know if you have further questions.
> >>>>>>>
> >>>>>>> Best,
> >>>>>>> Xuannan
> >>>>>>> On Aug 10, 2023, 20:23 +0800, Xintong Song <tonysong...@gmail.com
> >> <mailto:tonysong...@gmail.com>>,
> >>>>> wrote:
> >>>>>>>> Thanks for preparing the FLIP, Xuannan.
> >>>>>>>>
> >>>>>>>> +1 in general.
> >>>>>>>>
> >>>>>>>> A quick question, could you explain why we are relying on the
> >>>>> watermark
> >>>>>>> for
> >>>>>>>> emitting the record attribute? Why not use timestamps in the
> >>>>> records? I
> >>>>>>>> don't see any concern in using watermarks. Just wondering if
> >>>> there's
> >>>>> any
> >>>>>>>> deep considerations behind this.
> >>>>>>>>
> >>>>>>>> Best,
> >>>>>>>>
> >>>>>>>> Xintong
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On Thu, Aug 3, 2023 at 3:03 PM Xuannan Su <suxuanna...@gmail.com
> >> <mailto:suxuanna...@gmail.com>>
> >>>>> wrote:
> >>>>>>>>
> >>>>>>>>> Hi all,
> >>>>>>>>>
> >>>>>>>>> I am opening this thread to discuss FLIP-328: Allow source
> >>>>> operators to
> >>>>>>>>> determine isProcessingBacklog based on watermark lag[1]. We had a
> >>>>>>> several
> >>>>>>>>> discussions with Dong Ling about the design, and thanks for all
> >>>> the
> >>>>>>>>> valuable advice.
> >>>>>>>>>
> >>>>>>>>> The FLIP aims to target the use-case where user want to run a
> >>>> Flink
> >>>>>>> job to
> >>>>>>>>> backfill historical data in a high throughput manner and continue
> >>>>>>>>> processing real-time data with low latency. Building upon the
> >>>>> backlog
> >>>>>>>>> concept introduced in FLIP-309[2], this proposal enables sources
> >>>> to
> >>>>>>> report
> >>>>>>>>> their status of processing backlog based on the watermark lag.
> >>>>>>>>>
> >>>>>>>>> We would greatly appreciate any comments or feedback you may have
> >>>>> on
> >>>>>>> this
> >>>>>>>>> proposal.
> >>>>>>>>>
> >>>>>>>>> Best,
> >>>>>>>>> Xuannan
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> [1]
> >>>>>>>>>
> >>>>>>>
> >>>>>
> >>>>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-328%3A+Allow+source+operators+to+determine+isProcessingBacklog+based+on+watermark+lag
> >> <
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-328%3A+Allow+source+operators+to+determine+isProcessingBacklog+based+on+watermark+lag
> >>>
> >>>>>>>>> [2]
> >>>>>>>>>
> >>>>>>>
> >>>>>
> >>>>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-309%3A+Support+using+larger+checkpointing+interval+when+source+is+processing+backlog
> >> <
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-309%3A+Support+using+larger+checkpointing+interval+when+source+is+processing+backlog
>
>
>

Reply via email to