Hi Jing, Thank you for the clarification.
For the use case you mentioned, I believe we can utilize the HybridSource, as updated in FLIP-309[1], to determine the backlog status. For example, if the user wants to process data before time T in batch mode and after time T in stream mode, they can set the first source of the HybridSource to read up to time T and the last source of the HybridSource to read from time T. Best, Xuannan [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-309%3A+Support+using+larger+checkpointing+interval+when+source+is+processing+backlog On Mon, Sep 4, 2023 at 10:36 PM Jing Ge <j...@ververica.com.invalid> wrote: > > Hi Xuannan, > > Thanks for the clarification. > > 3. Event time and process time are two different things. It might be rarely > used, but conceptually, users can process data in the past within a > specific time range in the streaming mode. All data before that range will > be considered as backlog and needed to be processed in the batch mode, > like, e.g. the Present Perfect Progressive tense used in English language. > > Best regards, > Jing > > On Thu, Aug 31, 2023 at 4:45 AM Xuannan Su <suxuanna...@gmail.com> wrote: > > > Hi Jing, > > > > Thanks for the reply. > > > > 1. You are absolutely right that the watermark lag threshold must be > > carefully set with a thorough understanding of watermark generation. It is > > crucial for users to take into account the WatermarkStrategy when setting > > the watermark lag threshold. > > > > 2. Regarding pure processing-time based stream processing jobs, > > alternative strategies will be implemented to determine whether the job is > > processing backlog data. I have outlined two possible strategies below: > > > > - Based on the source operator's state. For example, when MySQL CDC source > > is reading snapshot, it can claim isBacklog=true. > > - Based on metrics. For example, when busyTimeMsPerSecond (or > > backPressuredTimeMsPerSecond) > user_specified_threshold, then > > isBacklog=true. > > > > As of the strategies proposed in this FLIP, it rely on generated > > watermarks. Therefore, if a user intends for the job to detect backlog > > status based on watermark, it is necessary to generate the watermark. > > > > 3. I'm afraid I'm not fully grasping your question. From my understanding, > > it should work in both cases. When event times are close to the processing > > time, resulting in watermarks close to the processing time, the job is not > > processing backlog data. On the other hand, when event times are far from > > processing time, causing watermarks to also be distant, if the lag > > surpasses the defined threshold, the job is considered processing backlog > > data. > > > > Best, > > Xuannan > > > > > > > On Aug 31, 2023, at 02:56, Jing Ge <j...@ververica.com.INVALID> wrote: > > > > > > Hi Xuannan, > > > > > > Thanks for the clarification. That is the part where I am trying to > > > understand your thoughts. I have some follow-up questions: > > > > > > 1. It depends strongly on the watermarkStrategy and how customized > > > watermark generation looks like. It mixes business logic with technical > > > implementation and technical data processing mode. The value of the > > > watermark lag threshold must be set very carefully. If the value is too > > > small. any time, when the watermark generation logic is changed(business > > > logic changes lead to the threshold getting exceeded), the same job might > > > be running surprisingly in backlog processing mode, i.e. a butterfly > > > effect. A comprehensive documentation is required to avoid any confusion > > > for the users. > > > 2. Like Jark already mentioned, use cases that do not have watermarks, > > > like pure processing-time based stream processing[1] are not covered. It > > is > > > more or less a trade-off solution that does not support such use cases > > and > > > appropriate documentation is required. Forcing them to explicitly > > generate > > > watermarks that are never needed just because of this does not sound > > like a > > > proper solution. > > > 3. If I am not mistaken, it only works for use cases where event times > > are > > > very close to the processing times, because the wall clock is used to > > > calculate the watermark lag and the watermark is generated based on the > > > event time. > > > > > > Best regards, > > > Jing > > > > > > [1] > > > > > https://github.com/apache/flink/blob/2c50b4e956305426f478b726d4de4a640a16b810/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkStrategy.java#L236 > > > > > > On Wed, Aug 30, 2023 at 4:06 AM Xuannan Su <suxuanna...@gmail.com> > > wrote: > > > > > >> Hi Jing, > > >> > > >> Thank you for the suggestion. > > >> > > >> The definition of watermark lag is the same as the watermarkLag metric > > in > > >> FLIP-33[1]. More specifically, the watermark lag calculation is > > computed at > > >> the time when a watermark is emitted downstream in the following way: > > >> watermarkLag = CurrentTime - Watermark. I have added this description to > > >> the FLIP. > > >> > > >> I hope this addresses your concern. > > >> > > >> Best, > > >> Xuannan > > >> > > >> [1] > > >> > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics > > >> > > >> > > >>> On Aug 28, 2023, at 01:04, Jing Ge <j...@ververica.com.INVALID> wrote: > > >>> > > >>> Hi Xuannan, > > >>> > > >>> Thanks for the proposal. +1 for me. > > >>> > > >>> There is one tiny thing that I am not sure if I understand it > > correctly. > > >>> Since there will be many different WatermarkStrategies and different > > >>> WatermarkGenerators. Could you please update the FLIP and add the > > >>> description of how the watermark lag is calculated exactly? E.g. > > >> Watermark > > >>> lag = A - B with A is the timestamp of the watermark emitted to the > > >>> downstream and B is....(this is the part I am not really sure after > > >> reading > > >>> the FLIP). > > >>> > > >>> Best regards, > > >>> Jing > > >>> > > >>> > > >>> On Mon, Aug 21, 2023 at 9:03 AM Xuannan Su <suxuanna...@gmail.com> > > >> wrote: > > >>> > > >>>> Hi Jark, > > >>>> > > >>>> Thanks for the comments. > > >>>> > > >>>> I agree that the current solution cannot support jobs that cannot > > define > > >>>> watermarks. However, after considering the pending-record-based > > >> solution, I > > >>>> believe the current solution is superior for the target use case as it > > >> is > > >>>> more intuitive for users. The backlog status gives users the ability > > to > > >>>> balance between throughput and latency. Making this trade-off decision > > >>>> based on the watermark lag is more intuitive from the user's > > >> perspective. > > >>>> For instance, a user can decide that if the job lags behind the > > current > > >>>> time by more than 1 hour, the result is not usable. In that case, we > > can > > >>>> optimize for throughput when the data lags behind by more than an > > hour. > > >>>> With the pending-record-based solution, it's challenging for users to > > >>>> determine when to optimize for throughput and when to prioritize > > >> latency. > > >>>> > > >>>> Regarding the limitations of the watermark-based solution: > > >>>> > > >>>> 1. The current solution can support jobs with sources that have event > > >>>> time. Users can always define a watermark at the source operator, even > > >> if > > >>>> it's not used by downstream operators, such as streaming join and > > >> unbounded > > >>>> aggregate. > > >>>> > > >>>> 2.I don't believe it's accurate to say that the watermark lag will > > keep > > >>>> increasing if no data is generated in Kafka. The watermark lag and > > >> backlog > > >>>> status are determined at the moment when the watermark is emitted to > > the > > >>>> downstream operator. If no data is emitted from the source, the > > >> watermark > > >>>> lag and backlog status will not be updated. If the WatermarkStrategy > > >> with > > >>>> idleness is used, the source becomes non-backlog when it becomes idle. > > >>>> > > >>>> 3. I think watermark lag is more intuitive to determine if a job is > > >>>> processing backlog data. Even when using pending records, it faces a > > >>>> similar issue. For example, if the source has 1K pending records, > > those > > >>>> records can span from 1 day to 1 hour to 1 second. If the records > > span > > >> 1 > > >>>> day, it's probably best to optimize for throughput. If they span 1 > > >> hour, it > > >>>> depends on the business logic. If they span 1 second, optimizing for > > >>>> latency is likely the better choice. > > >>>> > > >>>> In summary, I believe the watermark-based solution is a superior > > choice > > >>>> for the target use case where watermark/event time can be defined. > > >>>> Additionally, I haven't come across a scenario that requires > > low-latency > > >>>> processing and reads from a source that cannot define watermarks. If > > we > > >>>> encounter such a use case, we can create another FLIP to address those > > >>>> needs in the future. What do you think? > > >>>> > > >>>> > > >>>> Best, > > >>>> Xuannan > > >>>> > > >>>> > > >>>> > > >>>>> On Aug 20, 2023, at 23:27, Jark Wu <imj...@gmail.com <mailto: > > >>>> imj...@gmail.com>> wrote: > > >>>>> > > >>>>> Hi Xuannan, > > >>>>> > > >>>>> Thanks for opening this discussion. > > >>>>> > > >>>>> This current proposal may work in the mentioned watermark cases. > > >>>>> However, it seems this is not a general solution for sources to > > >> determine > > >>>>> "isProcessingBacklog". > > >>>>> From my point of view, there are 3 limitations of the current > > proposal: > > >>>>> 1. It doesn't cover jobs that don't have watermark/event-time > > defined, > > >>>>> for example streaming join and unbounded aggregate. We may still need > > >> to > > >>>>> figure out solutions for them. > > >>>>> 2. Watermark lag can not be trusted, because it increases unlimited > > if > > >> no > > >>>>> data is generated in the Kafka. > > >>>>> But in this case, there is no backlog at all. > > >>>>> 3. Watermark lag is hard to reflect the amount of backlog. If the > > >>>> watermark > > >>>>> lag is 1day or 1 hour or 1second, > > >>>>> there is possibly only 1 pending record there, which means no backlog > > >> at > > >>>>> all. > > >>>>> > > >>>>> Therefore, IMO, watermark maybe not the ideal metric used to > > determine > > >>>>> "isProcessingBacklog". > > >>>>> What we need is something that reflects the number of records > > >> unprocessed > > >>>>> by the job. > > >>>>> Actually, that is the "pendingRecords" metric proposed in FLIP-33 and > > >> has > > >>>>> been implemented by Kafka source. > > >>>>> Did you consider using "pendingRecords" metric to determine > > >>>>> "isProcessingBacklog"? > > >>>>> > > >>>>> Best, > > >>>>> Jark > > >>>>> > > >>>>> > > >>>>> [1] > > >>>>> > > >>>> > > >> > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics > > >>>> < > > >>>> > > >> > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics > > >>>>> > > >>>>> > > >>>>> > > >>>>> > > >>>>> On Tue, 15 Aug 2023 at 12:04, Xintong Song <tonysong...@gmail.com > > >>>> <mailto:tonysong...@gmail.com>> wrote: > > >>>>> > > >>>>>> Sounds good to me. > > >>>>>> > > >>>>>> It is true that, if we are introducing the generalized watermark, > > >> there > > >>>>>> will be other watermark related concepts / configurations that need > > to > > >>>> be > > >>>>>> updated anyway. > > >>>>>> > > >>>>>> > > >>>>>> Best, > > >>>>>> > > >>>>>> Xintong > > >>>>>> > > >>>>>> > > >>>>>> > > >>>>>> On Tue, Aug 15, 2023 at 11:30 AM Xuannan Su <suxuanna...@gmail.com > > >>>> <mailto:suxuanna...@gmail.com>> wrote: > > >>>>>> > > >>>>>>> Hi Xingtong, > > >>>>>>> > > >>>>>>> Thank you for your suggestion. > > >>>>>>> > > >>>>>>> After considering the idea of using a general configuration key, I > > >>>> think > > >>>>>>> it may not be a good idea for the reasons below. > > >>>>>>> > > >>>>>>> While I agree that using a more general configuration key provides > > us > > >>>>>> with > > >>>>>>> the flexibility to switch to other approaches to calculate the lag > > in > > >>>> the > > >>>>>>> future, the downside is that it may cause confusion for users. We > > >>>>>> currently > > >>>>>>> have fetchEventTimeLag, emitEventTimeLag, and watermarkLag in the > > >>>> source, > > >>>>>>> and it is not clear which specific lag we are referring to. With > > the > > >>>>>>> potential introduction of the Generalized Watermark mechanism in > > the > > >>>>>>> future, if I understand correctly, a watermark won't necessarily > > need > > >>>> to > > >>>>>> be > > >>>>>>> a timestamp. I am concern that the general configuration key may > > not > > >>>> be > > >>>>>>> enough to cover all the use case and we will need to introduce a > > >>>> general > > >>>>>>> way to determine the backlog status regardless. > > >>>>>>> > > >>>>>>> For the reasons above, I prefer introducing the configuration as > > is, > > >>>> and > > >>>>>>> change it later with the a deprecation process or migration > > process. > > >>>> What > > >>>>>>> do you think? > > >>>>>>> > > >>>>>>> Best, > > >>>>>>> Xuannan > > >>>>>>> On Aug 14, 2023, 14:09 +0800, Xintong Song <tonysong...@gmail.com > > >>>> <mailto:tonysong...@gmail.com>>, > > >>>>>> wrote: > > >>>>>>>> Thanks for the explanation. > > >>>>>>>> > > >>>>>>>> I wonder if it makes sense to not expose this detail via the > > >>>>>>> configuration > > >>>>>>>> option. To be specific, I suggest not mentioning the "watermark" > > >>>>>> keyword > > >>>>>>> in > > >>>>>>>> the configuration key and description. > > >>>>>>>> > > >>>>>>>> - From the users' perspective, I think they only need to know > > >> there's > > >>>> a > > >>>>>>>> lag higher than the given threshold, Flink will consider latency > > of > > >>>>>>>> individual records as less important and prioritize throughput > > over > > >>>> it. > > >>>>>>>> They don't really need the details of how the lags are calculated. > > >>>>>>>> - For the internal implementation, I also think using watermark > > lags > > >>>> is > > >>>>>>>> a good idea, for the reasons you've already mentioned. However, > > it's > > >>>>>> not > > >>>>>>>> the only possible option. Hiding this detail from users would give > > >> us > > >>>>>> the > > >>>>>>>> flexibility to switch to other approaches if needed in future. > > >>>>>>>> - We are currently working on designing the ProcessFunction API > > >>>>>>>> (consider it as a DataStream API V2). There's an idea to > > introduce a > > >>>>>>>> Generalized Watermark mechanism, where basically the watermark can > > >> be > > >>>>>>>> anything that needs to travel along the data-flow with certain > > >>>>>> alignment > > >>>>>>>> strategies, and event time watermark would be one specific case of > > >> it. > > >>>>>>> This > > >>>>>>>> is still an idea and has not been discussed and agreed on by the > > >>>>>>> community, > > >>>>>>>> and we are preparing a FLIP for it. But if we are going for it, > > the > > >>>>>>> concept > > >>>>>>>> "watermark-lag-threshold" could be ambiguous. > > >>>>>>>> > > >>>>>>>> I do not intend to block the FLIP on this. I'd also be fine with > > >>>>>>>> introducing the configuration as is, and changing it later, if > > >> needed, > > >>>>>>> with > > >>>>>>>> a regular deprecation and migration process. Just making my > > >>>>>> suggestions. > > >>>>>>>> > > >>>>>>>> > > >>>>>>>> Best, > > >>>>>>>> > > >>>>>>>> Xintong > > >>>>>>>> > > >>>>>>>> > > >>>>>>>> > > >>>>>>>> On Mon, Aug 14, 2023 at 12:00 PM Xuannan Su < > > suxuanna...@gmail.com > > >>>> <mailto:suxuanna...@gmail.com>> > > >>>>>>> wrote: > > >>>>>>>> > > >>>>>>>>> Hi Xintong, > > >>>>>>>>> > > >>>>>>>>> Thanks for the reply. > > >>>>>>>>> > > >>>>>>>>> I have considered using the timestamp in the records to determine > > >> the > > >>>>>>>>> backlog status, and decided to use watermark at the end. By > > >>>>>> definition, > > >>>>>>>>> watermark is the time progress indication in the data stream. It > > >>>>>>> indicates > > >>>>>>>>> the stream’s event time has progressed to some specific time. On > > >> the > > >>>>>>> other > > >>>>>>>>> hand, timestamp in the records is usually used to generate the > > >>>>>>> watermark. > > >>>>>>>>> Therefore, it appears more appropriate and intuitive to calculate > > >> the > > >>>>>>> event > > >>>>>>>>> time lag by watermark and determine the backlog status. And by > > >> using > > >>>>>>> the > > >>>>>>>>> watermark, we can easily deal with the out-of-order and the > > >> idleness > > >>>>>>> of the > > >>>>>>>>> data. > > >>>>>>>>> > > >>>>>>>>> Please let me know if you have further questions. > > >>>>>>>>> > > >>>>>>>>> Best, > > >>>>>>>>> Xuannan > > >>>>>>>>> On Aug 10, 2023, 20:23 +0800, Xintong Song < > > tonysong...@gmail.com > > >>>> <mailto:tonysong...@gmail.com>>, > > >>>>>>> wrote: > > >>>>>>>>>> Thanks for preparing the FLIP, Xuannan. > > >>>>>>>>>> > > >>>>>>>>>> +1 in general. > > >>>>>>>>>> > > >>>>>>>>>> A quick question, could you explain why we are relying on the > > >>>>>>> watermark > > >>>>>>>>> for > > >>>>>>>>>> emitting the record attribute? Why not use timestamps in the > > >>>>>>> records? I > > >>>>>>>>>> don't see any concern in using watermarks. Just wondering if > > >>>>>> there's > > >>>>>>> any > > >>>>>>>>>> deep considerations behind this. > > >>>>>>>>>> > > >>>>>>>>>> Best, > > >>>>>>>>>> > > >>>>>>>>>> Xintong > > >>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>>> On Thu, Aug 3, 2023 at 3:03 PM Xuannan Su < > > suxuanna...@gmail.com > > >>>> <mailto:suxuanna...@gmail.com>> > > >>>>>>> wrote: > > >>>>>>>>>> > > >>>>>>>>>>> Hi all, > > >>>>>>>>>>> > > >>>>>>>>>>> I am opening this thread to discuss FLIP-328: Allow source > > >>>>>>> operators to > > >>>>>>>>>>> determine isProcessingBacklog based on watermark lag[1]. We > > had a > > >>>>>>>>> several > > >>>>>>>>>>> discussions with Dong Ling about the design, and thanks for all > > >>>>>> the > > >>>>>>>>>>> valuable advice. > > >>>>>>>>>>> > > >>>>>>>>>>> The FLIP aims to target the use-case where user want to run a > > >>>>>> Flink > > >>>>>>>>> job to > > >>>>>>>>>>> backfill historical data in a high throughput manner and > > continue > > >>>>>>>>>>> processing real-time data with low latency. Building upon the > > >>>>>>> backlog > > >>>>>>>>>>> concept introduced in FLIP-309[2], this proposal enables > > sources > > >>>>>> to > > >>>>>>>>> report > > >>>>>>>>>>> their status of processing backlog based on the watermark lag. > > >>>>>>>>>>> > > >>>>>>>>>>> We would greatly appreciate any comments or feedback you may > > have > > >>>>>>> on > > >>>>>>>>> this > > >>>>>>>>>>> proposal. > > >>>>>>>>>>> > > >>>>>>>>>>> Best, > > >>>>>>>>>>> Xuannan > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> [1] > > >>>>>>>>>>> > > >>>>>>>>> > > >>>>>>> > > >>>>>> > > >>>> > > >> > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-328%3A+Allow+source+operators+to+determine+isProcessingBacklog+based+on+watermark+lag > > >>>> < > > >>>> > > >> > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-328%3A+Allow+source+operators+to+determine+isProcessingBacklog+based+on+watermark+lag > > >>>>> > > >>>>>>>>>>> [2] > > >>>>>>>>>>> > > >>>>>>>>> > > >>>>>>> > > >>>>>> > > >>>> > > >> > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-309%3A+Support+using+larger+checkpointing+interval+when+source+is+processing+backlog > > >>>> < > > >>>> > > >> > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-309%3A+Support+using+larger+checkpointing+interval+when+source+is+processing+backlog > > >> > > >> > > >> > > > >