Hi, Xuannan. Thanks for preparing the FLIP.
After this FLIP, we will have two ways to report isProcessingBacklog: 1. >From the source; 2. Judged by the watermark lag. What is the priority between them? For example, what is the status isProcessingBacklog when the source report `isProcessingBacklog=false` and the watermark lag exceeds the threshold? Best, Hang Xuannan Su <suxuanna...@gmail.com> 于2023年8月30日周三 10:06写道: > Hi Jing, > > Thank you for the suggestion. > > The definition of watermark lag is the same as the watermarkLag metric in > FLIP-33[1]. More specifically, the watermark lag calculation is computed at > the time when a watermark is emitted downstream in the following way: > watermarkLag = CurrentTime - Watermark. I have added this description to > the FLIP. > > I hope this addresses your concern. > > Best, > Xuannan > > [1] > https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics > > > > On Aug 28, 2023, at 01:04, Jing Ge <j...@ververica.com.INVALID> wrote: > > > > Hi Xuannan, > > > > Thanks for the proposal. +1 for me. > > > > There is one tiny thing that I am not sure if I understand it correctly. > > Since there will be many different WatermarkStrategies and different > > WatermarkGenerators. Could you please update the FLIP and add the > > description of how the watermark lag is calculated exactly? E.g. > Watermark > > lag = A - B with A is the timestamp of the watermark emitted to the > > downstream and B is....(this is the part I am not really sure after > reading > > the FLIP). > > > > Best regards, > > Jing > > > > > > On Mon, Aug 21, 2023 at 9:03 AM Xuannan Su <suxuanna...@gmail.com> > wrote: > > > >> Hi Jark, > >> > >> Thanks for the comments. > >> > >> I agree that the current solution cannot support jobs that cannot define > >> watermarks. However, after considering the pending-record-based > solution, I > >> believe the current solution is superior for the target use case as it > is > >> more intuitive for users. The backlog status gives users the ability to > >> balance between throughput and latency. Making this trade-off decision > >> based on the watermark lag is more intuitive from the user's > perspective. > >> For instance, a user can decide that if the job lags behind the current > >> time by more than 1 hour, the result is not usable. In that case, we can > >> optimize for throughput when the data lags behind by more than an hour. > >> With the pending-record-based solution, it's challenging for users to > >> determine when to optimize for throughput and when to prioritize > latency. > >> > >> Regarding the limitations of the watermark-based solution: > >> > >> 1. The current solution can support jobs with sources that have event > >> time. Users can always define a watermark at the source operator, even > if > >> it's not used by downstream operators, such as streaming join and > unbounded > >> aggregate. > >> > >> 2.I don't believe it's accurate to say that the watermark lag will keep > >> increasing if no data is generated in Kafka. The watermark lag and > backlog > >> status are determined at the moment when the watermark is emitted to the > >> downstream operator. If no data is emitted from the source, the > watermark > >> lag and backlog status will not be updated. If the WatermarkStrategy > with > >> idleness is used, the source becomes non-backlog when it becomes idle. > >> > >> 3. I think watermark lag is more intuitive to determine if a job is > >> processing backlog data. Even when using pending records, it faces a > >> similar issue. For example, if the source has 1K pending records, those > >> records can span from 1 day to 1 hour to 1 second. If the records span > 1 > >> day, it's probably best to optimize for throughput. If they span 1 > hour, it > >> depends on the business logic. If they span 1 second, optimizing for > >> latency is likely the better choice. > >> > >> In summary, I believe the watermark-based solution is a superior choice > >> for the target use case where watermark/event time can be defined. > >> Additionally, I haven't come across a scenario that requires low-latency > >> processing and reads from a source that cannot define watermarks. If we > >> encounter such a use case, we can create another FLIP to address those > >> needs in the future. What do you think? > >> > >> > >> Best, > >> Xuannan > >> > >> > >> > >>> On Aug 20, 2023, at 23:27, Jark Wu <imj...@gmail.com <mailto: > >> imj...@gmail.com>> wrote: > >>> > >>> Hi Xuannan, > >>> > >>> Thanks for opening this discussion. > >>> > >>> This current proposal may work in the mentioned watermark cases. > >>> However, it seems this is not a general solution for sources to > determine > >>> "isProcessingBacklog". > >>> From my point of view, there are 3 limitations of the current proposal: > >>> 1. It doesn't cover jobs that don't have watermark/event-time defined, > >>> for example streaming join and unbounded aggregate. We may still need > to > >>> figure out solutions for them. > >>> 2. Watermark lag can not be trusted, because it increases unlimited if > no > >>> data is generated in the Kafka. > >>> But in this case, there is no backlog at all. > >>> 3. Watermark lag is hard to reflect the amount of backlog. If the > >> watermark > >>> lag is 1day or 1 hour or 1second, > >>> there is possibly only 1 pending record there, which means no backlog > at > >>> all. > >>> > >>> Therefore, IMO, watermark maybe not the ideal metric used to determine > >>> "isProcessingBacklog". > >>> What we need is something that reflects the number of records > unprocessed > >>> by the job. > >>> Actually, that is the "pendingRecords" metric proposed in FLIP-33 and > has > >>> been implemented by Kafka source. > >>> Did you consider using "pendingRecords" metric to determine > >>> "isProcessingBacklog"? > >>> > >>> Best, > >>> Jark > >>> > >>> > >>> [1] > >>> > >> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics > >> < > >> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics > >>> > >>> > >>> > >>> > >>> On Tue, 15 Aug 2023 at 12:04, Xintong Song <tonysong...@gmail.com > >> <mailto:tonysong...@gmail.com>> wrote: > >>> > >>>> Sounds good to me. > >>>> > >>>> It is true that, if we are introducing the generalized watermark, > there > >>>> will be other watermark related concepts / configurations that need to > >> be > >>>> updated anyway. > >>>> > >>>> > >>>> Best, > >>>> > >>>> Xintong > >>>> > >>>> > >>>> > >>>> On Tue, Aug 15, 2023 at 11:30 AM Xuannan Su <suxuanna...@gmail.com > >> <mailto:suxuanna...@gmail.com>> wrote: > >>>> > >>>>> Hi Xingtong, > >>>>> > >>>>> Thank you for your suggestion. > >>>>> > >>>>> After considering the idea of using a general configuration key, I > >> think > >>>>> it may not be a good idea for the reasons below. > >>>>> > >>>>> While I agree that using a more general configuration key provides us > >>>> with > >>>>> the flexibility to switch to other approaches to calculate the lag in > >> the > >>>>> future, the downside is that it may cause confusion for users. We > >>>> currently > >>>>> have fetchEventTimeLag, emitEventTimeLag, and watermarkLag in the > >> source, > >>>>> and it is not clear which specific lag we are referring to. With the > >>>>> potential introduction of the Generalized Watermark mechanism in the > >>>>> future, if I understand correctly, a watermark won't necessarily need > >> to > >>>> be > >>>>> a timestamp. I am concern that the general configuration key may not > >> be > >>>>> enough to cover all the use case and we will need to introduce a > >> general > >>>>> way to determine the backlog status regardless. > >>>>> > >>>>> For the reasons above, I prefer introducing the configuration as is, > >> and > >>>>> change it later with the a deprecation process or migration process. > >> What > >>>>> do you think? > >>>>> > >>>>> Best, > >>>>> Xuannan > >>>>> On Aug 14, 2023, 14:09 +0800, Xintong Song <tonysong...@gmail.com > >> <mailto:tonysong...@gmail.com>>, > >>>> wrote: > >>>>>> Thanks for the explanation. > >>>>>> > >>>>>> I wonder if it makes sense to not expose this detail via the > >>>>> configuration > >>>>>> option. To be specific, I suggest not mentioning the "watermark" > >>>> keyword > >>>>> in > >>>>>> the configuration key and description. > >>>>>> > >>>>>> - From the users' perspective, I think they only need to know > there's > >> a > >>>>>> lag higher than the given threshold, Flink will consider latency of > >>>>>> individual records as less important and prioritize throughput over > >> it. > >>>>>> They don't really need the details of how the lags are calculated. > >>>>>> - For the internal implementation, I also think using watermark lags > >> is > >>>>>> a good idea, for the reasons you've already mentioned. However, it's > >>>> not > >>>>>> the only possible option. Hiding this detail from users would give > us > >>>> the > >>>>>> flexibility to switch to other approaches if needed in future. > >>>>>> - We are currently working on designing the ProcessFunction API > >>>>>> (consider it as a DataStream API V2). There's an idea to introduce a > >>>>>> Generalized Watermark mechanism, where basically the watermark can > be > >>>>>> anything that needs to travel along the data-flow with certain > >>>> alignment > >>>>>> strategies, and event time watermark would be one specific case of > it. > >>>>> This > >>>>>> is still an idea and has not been discussed and agreed on by the > >>>>> community, > >>>>>> and we are preparing a FLIP for it. But if we are going for it, the > >>>>> concept > >>>>>> "watermark-lag-threshold" could be ambiguous. > >>>>>> > >>>>>> I do not intend to block the FLIP on this. I'd also be fine with > >>>>>> introducing the configuration as is, and changing it later, if > needed, > >>>>> with > >>>>>> a regular deprecation and migration process. Just making my > >>>> suggestions. > >>>>>> > >>>>>> > >>>>>> Best, > >>>>>> > >>>>>> Xintong > >>>>>> > >>>>>> > >>>>>> > >>>>>> On Mon, Aug 14, 2023 at 12:00 PM Xuannan Su <suxuanna...@gmail.com > >> <mailto:suxuanna...@gmail.com>> > >>>>> wrote: > >>>>>> > >>>>>>> Hi Xintong, > >>>>>>> > >>>>>>> Thanks for the reply. > >>>>>>> > >>>>>>> I have considered using the timestamp in the records to determine > the > >>>>>>> backlog status, and decided to use watermark at the end. By > >>>> definition, > >>>>>>> watermark is the time progress indication in the data stream. It > >>>>> indicates > >>>>>>> the stream’s event time has progressed to some specific time. On > the > >>>>> other > >>>>>>> hand, timestamp in the records is usually used to generate the > >>>>> watermark. > >>>>>>> Therefore, it appears more appropriate and intuitive to calculate > the > >>>>> event > >>>>>>> time lag by watermark and determine the backlog status. And by > using > >>>>> the > >>>>>>> watermark, we can easily deal with the out-of-order and the > idleness > >>>>> of the > >>>>>>> data. > >>>>>>> > >>>>>>> Please let me know if you have further questions. > >>>>>>> > >>>>>>> Best, > >>>>>>> Xuannan > >>>>>>> On Aug 10, 2023, 20:23 +0800, Xintong Song <tonysong...@gmail.com > >> <mailto:tonysong...@gmail.com>>, > >>>>> wrote: > >>>>>>>> Thanks for preparing the FLIP, Xuannan. > >>>>>>>> > >>>>>>>> +1 in general. > >>>>>>>> > >>>>>>>> A quick question, could you explain why we are relying on the > >>>>> watermark > >>>>>>> for > >>>>>>>> emitting the record attribute? Why not use timestamps in the > >>>>> records? I > >>>>>>>> don't see any concern in using watermarks. Just wondering if > >>>> there's > >>>>> any > >>>>>>>> deep considerations behind this. > >>>>>>>> > >>>>>>>> Best, > >>>>>>>> > >>>>>>>> Xintong > >>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>> On Thu, Aug 3, 2023 at 3:03 PM Xuannan Su <suxuanna...@gmail.com > >> <mailto:suxuanna...@gmail.com>> > >>>>> wrote: > >>>>>>>> > >>>>>>>>> Hi all, > >>>>>>>>> > >>>>>>>>> I am opening this thread to discuss FLIP-328: Allow source > >>>>> operators to > >>>>>>>>> determine isProcessingBacklog based on watermark lag[1]. We had a > >>>>>>> several > >>>>>>>>> discussions with Dong Ling about the design, and thanks for all > >>>> the > >>>>>>>>> valuable advice. > >>>>>>>>> > >>>>>>>>> The FLIP aims to target the use-case where user want to run a > >>>> Flink > >>>>>>> job to > >>>>>>>>> backfill historical data in a high throughput manner and continue > >>>>>>>>> processing real-time data with low latency. Building upon the > >>>>> backlog > >>>>>>>>> concept introduced in FLIP-309[2], this proposal enables sources > >>>> to > >>>>>>> report > >>>>>>>>> their status of processing backlog based on the watermark lag. > >>>>>>>>> > >>>>>>>>> We would greatly appreciate any comments or feedback you may have > >>>>> on > >>>>>>> this > >>>>>>>>> proposal. > >>>>>>>>> > >>>>>>>>> Best, > >>>>>>>>> Xuannan > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> [1] > >>>>>>>>> > >>>>>>> > >>>>> > >>>> > >> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-328%3A+Allow+source+operators+to+determine+isProcessingBacklog+based+on+watermark+lag > >> < > >> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-328%3A+Allow+source+operators+to+determine+isProcessingBacklog+based+on+watermark+lag > >>> > >>>>>>>>> [2] > >>>>>>>>> > >>>>>>> > >>>>> > >>>> > >> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-309%3A+Support+using+larger+checkpointing+interval+when+source+is+processing+backlog > >> < > >> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-309%3A+Support+using+larger+checkpointing+interval+when+source+is+processing+backlog > > >