Hi Jing, Thank you for the suggestion.
The definition of watermark lag is the same as the watermarkLag metric in FLIP-33[1]. More specifically, the watermark lag calculation is computed at the time when a watermark is emitted downstream in the following way: watermarkLag = CurrentTime - Watermark. I have added this description to the FLIP. I hope this addresses your concern. Best, Xuannan [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics > On Aug 28, 2023, at 01:04, Jing Ge <j...@ververica.com.INVALID> wrote: > > Hi Xuannan, > > Thanks for the proposal. +1 for me. > > There is one tiny thing that I am not sure if I understand it correctly. > Since there will be many different WatermarkStrategies and different > WatermarkGenerators. Could you please update the FLIP and add the > description of how the watermark lag is calculated exactly? E.g. Watermark > lag = A - B with A is the timestamp of the watermark emitted to the > downstream and B is....(this is the part I am not really sure after reading > the FLIP). > > Best regards, > Jing > > > On Mon, Aug 21, 2023 at 9:03 AM Xuannan Su <suxuanna...@gmail.com> wrote: > >> Hi Jark, >> >> Thanks for the comments. >> >> I agree that the current solution cannot support jobs that cannot define >> watermarks. However, after considering the pending-record-based solution, I >> believe the current solution is superior for the target use case as it is >> more intuitive for users. The backlog status gives users the ability to >> balance between throughput and latency. Making this trade-off decision >> based on the watermark lag is more intuitive from the user's perspective. >> For instance, a user can decide that if the job lags behind the current >> time by more than 1 hour, the result is not usable. In that case, we can >> optimize for throughput when the data lags behind by more than an hour. >> With the pending-record-based solution, it's challenging for users to >> determine when to optimize for throughput and when to prioritize latency. >> >> Regarding the limitations of the watermark-based solution: >> >> 1. The current solution can support jobs with sources that have event >> time. Users can always define a watermark at the source operator, even if >> it's not used by downstream operators, such as streaming join and unbounded >> aggregate. >> >> 2.I don't believe it's accurate to say that the watermark lag will keep >> increasing if no data is generated in Kafka. The watermark lag and backlog >> status are determined at the moment when the watermark is emitted to the >> downstream operator. If no data is emitted from the source, the watermark >> lag and backlog status will not be updated. If the WatermarkStrategy with >> idleness is used, the source becomes non-backlog when it becomes idle. >> >> 3. I think watermark lag is more intuitive to determine if a job is >> processing backlog data. Even when using pending records, it faces a >> similar issue. For example, if the source has 1K pending records, those >> records can span from 1 day to 1 hour to 1 second. If the records span 1 >> day, it's probably best to optimize for throughput. If they span 1 hour, it >> depends on the business logic. If they span 1 second, optimizing for >> latency is likely the better choice. >> >> In summary, I believe the watermark-based solution is a superior choice >> for the target use case where watermark/event time can be defined. >> Additionally, I haven't come across a scenario that requires low-latency >> processing and reads from a source that cannot define watermarks. If we >> encounter such a use case, we can create another FLIP to address those >> needs in the future. What do you think? >> >> >> Best, >> Xuannan >> >> >> >>> On Aug 20, 2023, at 23:27, Jark Wu <imj...@gmail.com <mailto: >> imj...@gmail.com>> wrote: >>> >>> Hi Xuannan, >>> >>> Thanks for opening this discussion. >>> >>> This current proposal may work in the mentioned watermark cases. >>> However, it seems this is not a general solution for sources to determine >>> "isProcessingBacklog". >>> From my point of view, there are 3 limitations of the current proposal: >>> 1. It doesn't cover jobs that don't have watermark/event-time defined, >>> for example streaming join and unbounded aggregate. We may still need to >>> figure out solutions for them. >>> 2. Watermark lag can not be trusted, because it increases unlimited if no >>> data is generated in the Kafka. >>> But in this case, there is no backlog at all. >>> 3. Watermark lag is hard to reflect the amount of backlog. If the >> watermark >>> lag is 1day or 1 hour or 1second, >>> there is possibly only 1 pending record there, which means no backlog at >>> all. >>> >>> Therefore, IMO, watermark maybe not the ideal metric used to determine >>> "isProcessingBacklog". >>> What we need is something that reflects the number of records unprocessed >>> by the job. >>> Actually, that is the "pendingRecords" metric proposed in FLIP-33 and has >>> been implemented by Kafka source. >>> Did you consider using "pendingRecords" metric to determine >>> "isProcessingBacklog"? >>> >>> Best, >>> Jark >>> >>> >>> [1] >>> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics >> < >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics >>> >>> >>> >>> >>> On Tue, 15 Aug 2023 at 12:04, Xintong Song <tonysong...@gmail.com >> <mailto:tonysong...@gmail.com>> wrote: >>> >>>> Sounds good to me. >>>> >>>> It is true that, if we are introducing the generalized watermark, there >>>> will be other watermark related concepts / configurations that need to >> be >>>> updated anyway. >>>> >>>> >>>> Best, >>>> >>>> Xintong >>>> >>>> >>>> >>>> On Tue, Aug 15, 2023 at 11:30 AM Xuannan Su <suxuanna...@gmail.com >> <mailto:suxuanna...@gmail.com>> wrote: >>>> >>>>> Hi Xingtong, >>>>> >>>>> Thank you for your suggestion. >>>>> >>>>> After considering the idea of using a general configuration key, I >> think >>>>> it may not be a good idea for the reasons below. >>>>> >>>>> While I agree that using a more general configuration key provides us >>>> with >>>>> the flexibility to switch to other approaches to calculate the lag in >> the >>>>> future, the downside is that it may cause confusion for users. We >>>> currently >>>>> have fetchEventTimeLag, emitEventTimeLag, and watermarkLag in the >> source, >>>>> and it is not clear which specific lag we are referring to. With the >>>>> potential introduction of the Generalized Watermark mechanism in the >>>>> future, if I understand correctly, a watermark won't necessarily need >> to >>>> be >>>>> a timestamp. I am concern that the general configuration key may not >> be >>>>> enough to cover all the use case and we will need to introduce a >> general >>>>> way to determine the backlog status regardless. >>>>> >>>>> For the reasons above, I prefer introducing the configuration as is, >> and >>>>> change it later with the a deprecation process or migration process. >> What >>>>> do you think? >>>>> >>>>> Best, >>>>> Xuannan >>>>> On Aug 14, 2023, 14:09 +0800, Xintong Song <tonysong...@gmail.com >> <mailto:tonysong...@gmail.com>>, >>>> wrote: >>>>>> Thanks for the explanation. >>>>>> >>>>>> I wonder if it makes sense to not expose this detail via the >>>>> configuration >>>>>> option. To be specific, I suggest not mentioning the "watermark" >>>> keyword >>>>> in >>>>>> the configuration key and description. >>>>>> >>>>>> - From the users' perspective, I think they only need to know there's >> a >>>>>> lag higher than the given threshold, Flink will consider latency of >>>>>> individual records as less important and prioritize throughput over >> it. >>>>>> They don't really need the details of how the lags are calculated. >>>>>> - For the internal implementation, I also think using watermark lags >> is >>>>>> a good idea, for the reasons you've already mentioned. However, it's >>>> not >>>>>> the only possible option. Hiding this detail from users would give us >>>> the >>>>>> flexibility to switch to other approaches if needed in future. >>>>>> - We are currently working on designing the ProcessFunction API >>>>>> (consider it as a DataStream API V2). There's an idea to introduce a >>>>>> Generalized Watermark mechanism, where basically the watermark can be >>>>>> anything that needs to travel along the data-flow with certain >>>> alignment >>>>>> strategies, and event time watermark would be one specific case of it. >>>>> This >>>>>> is still an idea and has not been discussed and agreed on by the >>>>> community, >>>>>> and we are preparing a FLIP for it. But if we are going for it, the >>>>> concept >>>>>> "watermark-lag-threshold" could be ambiguous. >>>>>> >>>>>> I do not intend to block the FLIP on this. I'd also be fine with >>>>>> introducing the configuration as is, and changing it later, if needed, >>>>> with >>>>>> a regular deprecation and migration process. Just making my >>>> suggestions. >>>>>> >>>>>> >>>>>> Best, >>>>>> >>>>>> Xintong >>>>>> >>>>>> >>>>>> >>>>>> On Mon, Aug 14, 2023 at 12:00 PM Xuannan Su <suxuanna...@gmail.com >> <mailto:suxuanna...@gmail.com>> >>>>> wrote: >>>>>> >>>>>>> Hi Xintong, >>>>>>> >>>>>>> Thanks for the reply. >>>>>>> >>>>>>> I have considered using the timestamp in the records to determine the >>>>>>> backlog status, and decided to use watermark at the end. By >>>> definition, >>>>>>> watermark is the time progress indication in the data stream. It >>>>> indicates >>>>>>> the stream’s event time has progressed to some specific time. On the >>>>> other >>>>>>> hand, timestamp in the records is usually used to generate the >>>>> watermark. >>>>>>> Therefore, it appears more appropriate and intuitive to calculate the >>>>> event >>>>>>> time lag by watermark and determine the backlog status. And by using >>>>> the >>>>>>> watermark, we can easily deal with the out-of-order and the idleness >>>>> of the >>>>>>> data. >>>>>>> >>>>>>> Please let me know if you have further questions. >>>>>>> >>>>>>> Best, >>>>>>> Xuannan >>>>>>> On Aug 10, 2023, 20:23 +0800, Xintong Song <tonysong...@gmail.com >> <mailto:tonysong...@gmail.com>>, >>>>> wrote: >>>>>>>> Thanks for preparing the FLIP, Xuannan. >>>>>>>> >>>>>>>> +1 in general. >>>>>>>> >>>>>>>> A quick question, could you explain why we are relying on the >>>>> watermark >>>>>>> for >>>>>>>> emitting the record attribute? Why not use timestamps in the >>>>> records? I >>>>>>>> don't see any concern in using watermarks. Just wondering if >>>> there's >>>>> any >>>>>>>> deep considerations behind this. >>>>>>>> >>>>>>>> Best, >>>>>>>> >>>>>>>> Xintong >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Thu, Aug 3, 2023 at 3:03 PM Xuannan Su <suxuanna...@gmail.com >> <mailto:suxuanna...@gmail.com>> >>>>> wrote: >>>>>>>> >>>>>>>>> Hi all, >>>>>>>>> >>>>>>>>> I am opening this thread to discuss FLIP-328: Allow source >>>>> operators to >>>>>>>>> determine isProcessingBacklog based on watermark lag[1]. We had a >>>>>>> several >>>>>>>>> discussions with Dong Ling about the design, and thanks for all >>>> the >>>>>>>>> valuable advice. >>>>>>>>> >>>>>>>>> The FLIP aims to target the use-case where user want to run a >>>> Flink >>>>>>> job to >>>>>>>>> backfill historical data in a high throughput manner and continue >>>>>>>>> processing real-time data with low latency. Building upon the >>>>> backlog >>>>>>>>> concept introduced in FLIP-309[2], this proposal enables sources >>>> to >>>>>>> report >>>>>>>>> their status of processing backlog based on the watermark lag. >>>>>>>>> >>>>>>>>> We would greatly appreciate any comments or feedback you may have >>>>> on >>>>>>> this >>>>>>>>> proposal. >>>>>>>>> >>>>>>>>> Best, >>>>>>>>> Xuannan >>>>>>>>> >>>>>>>>> >>>>>>>>> [1] >>>>>>>>> >>>>>>> >>>>> >>>> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-328%3A+Allow+source+operators+to+determine+isProcessingBacklog+based+on+watermark+lag >> < >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-328%3A+Allow+source+operators+to+determine+isProcessingBacklog+based+on+watermark+lag >>> >>>>>>>>> [2] >>>>>>>>> >>>>>>> >>>>> >>>> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-309%3A+Support+using+larger+checkpointing+interval+when+source+is+processing+backlog >> < >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-309%3A+Support+using+larger+checkpointing+interval+when+source+is+processing+backlog