Hi Jing,

Thank you for the suggestion.

The definition of watermark lag is the same as the watermarkLag metric in 
FLIP-33[1]. More specifically, the watermark lag calculation is computed at the 
time when a watermark is emitted downstream in the following way: watermarkLag 
= CurrentTime - Watermark. I have added this description to the FLIP.

I hope this addresses your concern.

Best, 
Xuannan

[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics


> On Aug 28, 2023, at 01:04, Jing Ge <j...@ververica.com.INVALID> wrote:
> 
> Hi Xuannan,
> 
> Thanks for the proposal. +1 for me.
> 
> There is one tiny thing that I am not sure if I understand it correctly.
> Since there will be many different WatermarkStrategies and different
> WatermarkGenerators. Could you please update the FLIP and add the
> description of how the watermark lag is calculated exactly? E.g. Watermark
> lag = A - B with A is the timestamp of the watermark emitted to the
> downstream and B is....(this is the part I am not really sure after reading
> the FLIP).
> 
> Best regards,
> Jing
> 
> 
> On Mon, Aug 21, 2023 at 9:03 AM Xuannan Su <suxuanna...@gmail.com> wrote:
> 
>> Hi Jark,
>> 
>> Thanks for the comments.
>> 
>> I agree that the current solution cannot support jobs that cannot define
>> watermarks. However, after considering the pending-record-based solution, I
>> believe the current solution is superior for the target use case as it is
>> more intuitive for users. The backlog status gives users the ability to
>> balance between throughput and latency. Making this trade-off decision
>> based on the watermark lag is more intuitive from the user's perspective.
>> For instance, a user can decide that if the job lags behind the current
>> time by more than 1 hour, the result is not usable. In that case, we can
>> optimize for throughput when the data lags behind by more than an hour.
>> With the pending-record-based solution, it's challenging for users to
>> determine when to optimize for throughput and when to prioritize latency.
>> 
>> Regarding the limitations of the watermark-based solution:
>> 
>> 1. The current solution can support jobs with sources that have event
>> time. Users can always define a watermark at the source operator, even if
>> it's not used by downstream operators, such as streaming join and unbounded
>> aggregate.
>> 
>> 2.I don't believe it's accurate to say that the watermark lag will keep
>> increasing if no data is generated in Kafka. The watermark lag and backlog
>> status are determined at the moment when the watermark is emitted to the
>> downstream operator. If no data is emitted from the source, the watermark
>> lag and backlog status will not be updated. If the WatermarkStrategy with
>> idleness is used, the source becomes non-backlog when it becomes idle.
>> 
>> 3. I think watermark lag is more intuitive to determine if a job is
>> processing backlog data. Even when using pending records, it faces a
>> similar issue. For example, if the source has 1K pending records, those
>> records can span from 1 day  to 1 hour to 1 second. If the records span 1
>> day, it's probably best to optimize for throughput. If they span 1 hour, it
>> depends on the business logic. If they span 1 second, optimizing for
>> latency is likely the better choice.
>> 
>> In summary, I believe the watermark-based solution is a superior choice
>> for the target use case where watermark/event time can be defined.
>> Additionally, I haven't come across a scenario that requires low-latency
>> processing and reads from a source that cannot define watermarks. If we
>> encounter such a use case, we can create another FLIP to address those
>> needs in the future. What do you think?
>> 
>> 
>> Best,
>> Xuannan
>> 
>> 
>> 
>>> On Aug 20, 2023, at 23:27, Jark Wu <imj...@gmail.com <mailto:
>> imj...@gmail.com>> wrote:
>>> 
>>> Hi Xuannan,
>>> 
>>> Thanks for opening this discussion.
>>> 
>>> This current proposal may work in the mentioned watermark cases.
>>> However, it seems this is not a general solution for sources to determine
>>> "isProcessingBacklog".
>>> From my point of view, there are 3 limitations of the current proposal:
>>> 1. It doesn't cover jobs that don't have watermark/event-time defined,
>>> for example streaming join and unbounded aggregate. We may still need to
>>> figure out solutions for them.
>>> 2. Watermark lag can not be trusted, because it increases unlimited if no
>>> data is generated in the Kafka.
>>> But in this case, there is no backlog at all.
>>> 3. Watermark lag is hard to reflect the amount of backlog. If the
>> watermark
>>> lag is 1day or 1 hour or 1second,
>>> there is possibly only 1 pending record there, which means no backlog at
>>> all.
>>> 
>>> Therefore, IMO, watermark maybe not the ideal metric used to determine
>>> "isProcessingBacklog".
>>> What we need is something that reflects the number of records unprocessed
>>> by the job.
>>> Actually, that is the "pendingRecords" metric proposed in FLIP-33 and has
>>> been implemented by Kafka source.
>>> Did you consider using "pendingRecords" metric to determine
>>> "isProcessingBacklog"?
>>> 
>>> Best,
>>> Jark
>>> 
>>> 
>>> [1]
>>> 
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics
>> <
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics
>>> 
>>> 
>>> 
>>> 
>>> On Tue, 15 Aug 2023 at 12:04, Xintong Song <tonysong...@gmail.com
>> <mailto:tonysong...@gmail.com>> wrote:
>>> 
>>>> Sounds good to me.
>>>> 
>>>> It is true that, if we are introducing the generalized watermark, there
>>>> will be other watermark related concepts / configurations that need to
>> be
>>>> updated anyway.
>>>> 
>>>> 
>>>> Best,
>>>> 
>>>> Xintong
>>>> 
>>>> 
>>>> 
>>>> On Tue, Aug 15, 2023 at 11:30 AM Xuannan Su <suxuanna...@gmail.com
>> <mailto:suxuanna...@gmail.com>> wrote:
>>>> 
>>>>> Hi Xingtong,
>>>>> 
>>>>> Thank you for your suggestion.
>>>>> 
>>>>> After considering the idea of using a general configuration key, I
>> think
>>>>> it may not be a good idea for the reasons below.
>>>>> 
>>>>> While I agree that using a more general configuration key provides us
>>>> with
>>>>> the flexibility to switch to other approaches to calculate the lag in
>> the
>>>>> future, the downside is that it may cause confusion for users. We
>>>> currently
>>>>> have fetchEventTimeLag, emitEventTimeLag, and watermarkLag in the
>> source,
>>>>> and it is not clear which specific lag we are referring to. With the
>>>>> potential introduction of the Generalized Watermark mechanism in the
>>>>> future, if I understand correctly, a watermark won't necessarily need
>> to
>>>> be
>>>>> a timestamp. I am concern that the general configuration key may not
>> be
>>>>> enough to cover all the use case and we will need to introduce a
>> general
>>>>> way to determine the backlog status regardless.
>>>>> 
>>>>> For the reasons above, I prefer introducing the configuration as is,
>> and
>>>>> change it later with the a deprecation process or migration process.
>> What
>>>>> do you think?
>>>>> 
>>>>> Best,
>>>>> Xuannan
>>>>> On Aug 14, 2023, 14:09 +0800, Xintong Song <tonysong...@gmail.com
>> <mailto:tonysong...@gmail.com>>,
>>>> wrote:
>>>>>> Thanks for the explanation.
>>>>>> 
>>>>>> I wonder if it makes sense to not expose this detail via the
>>>>> configuration
>>>>>> option. To be specific, I suggest not mentioning the "watermark"
>>>> keyword
>>>>> in
>>>>>> the configuration key and description.
>>>>>> 
>>>>>> - From the users' perspective, I think they only need to know there's
>> a
>>>>>> lag higher than the given threshold, Flink will consider latency of
>>>>>> individual records as less important and prioritize throughput over
>> it.
>>>>>> They don't really need the details of how the lags are calculated.
>>>>>> - For the internal implementation, I also think using watermark lags
>> is
>>>>>> a good idea, for the reasons you've already mentioned. However, it's
>>>> not
>>>>>> the only possible option. Hiding this detail from users would give us
>>>> the
>>>>>> flexibility to switch to other approaches if needed in future.
>>>>>> - We are currently working on designing the ProcessFunction API
>>>>>> (consider it as a DataStream API V2). There's an idea to introduce a
>>>>>> Generalized Watermark mechanism, where basically the watermark can be
>>>>>> anything that needs to travel along the data-flow with certain
>>>> alignment
>>>>>> strategies, and event time watermark would be one specific case of it.
>>>>> This
>>>>>> is still an idea and has not been discussed and agreed on by the
>>>>> community,
>>>>>> and we are preparing a FLIP for it. But if we are going for it, the
>>>>> concept
>>>>>> "watermark-lag-threshold" could be ambiguous.
>>>>>> 
>>>>>> I do not intend to block the FLIP on this. I'd also be fine with
>>>>>> introducing the configuration as is, and changing it later, if needed,
>>>>> with
>>>>>> a regular deprecation and migration process. Just making my
>>>> suggestions.
>>>>>> 
>>>>>> 
>>>>>> Best,
>>>>>> 
>>>>>> Xintong
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> On Mon, Aug 14, 2023 at 12:00 PM Xuannan Su <suxuanna...@gmail.com
>> <mailto:suxuanna...@gmail.com>>
>>>>> wrote:
>>>>>> 
>>>>>>> Hi Xintong,
>>>>>>> 
>>>>>>> Thanks for the reply.
>>>>>>> 
>>>>>>> I have considered using the timestamp in the records to determine the
>>>>>>> backlog status, and decided to use watermark at the end. By
>>>> definition,
>>>>>>> watermark is the time progress indication in the data stream. It
>>>>> indicates
>>>>>>> the stream’s event time has progressed to some specific time. On the
>>>>> other
>>>>>>> hand, timestamp in the records is usually used to generate the
>>>>> watermark.
>>>>>>> Therefore, it appears more appropriate and intuitive to calculate the
>>>>> event
>>>>>>> time lag by watermark and determine the backlog status. And by using
>>>>> the
>>>>>>> watermark, we can easily deal with the out-of-order and the idleness
>>>>> of the
>>>>>>> data.
>>>>>>> 
>>>>>>> Please let me know if you have further questions.
>>>>>>> 
>>>>>>> Best,
>>>>>>> Xuannan
>>>>>>> On Aug 10, 2023, 20:23 +0800, Xintong Song <tonysong...@gmail.com
>> <mailto:tonysong...@gmail.com>>,
>>>>> wrote:
>>>>>>>> Thanks for preparing the FLIP, Xuannan.
>>>>>>>> 
>>>>>>>> +1 in general.
>>>>>>>> 
>>>>>>>> A quick question, could you explain why we are relying on the
>>>>> watermark
>>>>>>> for
>>>>>>>> emitting the record attribute? Why not use timestamps in the
>>>>> records? I
>>>>>>>> don't see any concern in using watermarks. Just wondering if
>>>> there's
>>>>> any
>>>>>>>> deep considerations behind this.
>>>>>>>> 
>>>>>>>> Best,
>>>>>>>> 
>>>>>>>> Xintong
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> On Thu, Aug 3, 2023 at 3:03 PM Xuannan Su <suxuanna...@gmail.com
>> <mailto:suxuanna...@gmail.com>>
>>>>> wrote:
>>>>>>>> 
>>>>>>>>> Hi all,
>>>>>>>>> 
>>>>>>>>> I am opening this thread to discuss FLIP-328: Allow source
>>>>> operators to
>>>>>>>>> determine isProcessingBacklog based on watermark lag[1]. We had a
>>>>>>> several
>>>>>>>>> discussions with Dong Ling about the design, and thanks for all
>>>> the
>>>>>>>>> valuable advice.
>>>>>>>>> 
>>>>>>>>> The FLIP aims to target the use-case where user want to run a
>>>> Flink
>>>>>>> job to
>>>>>>>>> backfill historical data in a high throughput manner and continue
>>>>>>>>> processing real-time data with low latency. Building upon the
>>>>> backlog
>>>>>>>>> concept introduced in FLIP-309[2], this proposal enables sources
>>>> to
>>>>>>> report
>>>>>>>>> their status of processing backlog based on the watermark lag.
>>>>>>>>> 
>>>>>>>>> We would greatly appreciate any comments or feedback you may have
>>>>> on
>>>>>>> this
>>>>>>>>> proposal.
>>>>>>>>> 
>>>>>>>>> Best,
>>>>>>>>> Xuannan
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> [1]
>>>>>>>>> 
>>>>>>> 
>>>>> 
>>>> 
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-328%3A+Allow+source+operators+to+determine+isProcessingBacklog+based+on+watermark+lag
>> <
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-328%3A+Allow+source+operators+to+determine+isProcessingBacklog+based+on+watermark+lag
>>> 
>>>>>>>>> [2]
>>>>>>>>> 
>>>>>>> 
>>>>> 
>>>> 
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-309%3A+Support+using+larger+checkpointing+interval+when+source+is+processing+backlog
>> <
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-309%3A+Support+using+larger+checkpointing+interval+when+source+is+processing+backlog


Reply via email to