Dong,

Thank you for the careful analysis of my proposal. Your conclusions
make sense to me.

David

On Mon, Jul 24, 2023 at 8:37 PM Dong Lin <lindon...@gmail.com> wrote:
>
> Hi David,
>
> Thank you for the detailed comments and the suggestion of this alternative 
> approach.
>
> I agree with you that this alternative can also address the target use-case 
> with the same correctness. In comparison to the current FLIP, this 
> alternative indeed introduces much less complexity to the Flink runtime 
> internal implementation.
>
> At a high level, this alternative is simulating a one-time emission of 
> Watermark(useProcessingTime=true) with periodic emission of 
> Watermark(timestamp=wall-lock-time).
>
> One downside of this alternative is that it can introduce a bit of extra 
> per-record runtime overhead. This is because the ingestion time watermark 
> will be emitted periodically according to pipeline.auto-watermark-interval 
> (200 ms by default). Thus there is still a short period where the watermark 
> from the HybridSource can be lagging behind wall-clock time. For operators 
> whose logic depends on the watermark, such as TemporalRowTimeJoinOperator, 
> they will need to check build-side watermark and delay/buffer records on the 
> probe-side until it receives the next ingestion-time watermark.
>
> The impact of this overhead probably depends on the throughput/watermark of 
> the probe-side records. On the other hand, given that join operator is 
> typically already heavy (due to state backend access and build-side buffer), 
> and the watermark from probe-side (e.g. Kafka) is probably also lagging 
> behind wall-clock time, it is probably not an issue in most cases. Therefore 
> I agree that it is worth trying this approach. We can revisit this issue if 
> we any issues around performance or usability of this approach.
>
> Another potential concern is that it requires the user to use ingestion time. 
> I am not sure we are able to do this in a backward-compatible way yet. We 
> probably need to go through the existing APIs around ingestion time watermark 
> to validate this.
>
> BTW, with the introduction of RecordAttributes(isBacklog=true/false) from 
> FLIP-327, another short-term approach is to let 
> TemporalProcessTimeJoinOperator keep buffering records from 
> MySQL/HybridSource as long as isBacklog=true, and process them in a 
> processing-time manner once it receives isBacklog=false. This should also 
> address the use-case targeted by FLIP-326. The only caveat with this approach 
> is that it is a bit hacky, because it requires JoinOpertor to always buffer 
> records when isBacklog=true, whereas isBacklog's semantics only says it is 
> "optional" to buffer records, which can be an issue in the long term.
>
> Thanks,
> Dong
>
> On Tue, Jul 25, 2023 at 2:37 AM David Anderson <dander...@apache.org> wrote:
>>
>> I'm delighted to see interest in developing support for
>> processing-time temporal joins.
>>
>> The proposed implementation seems rather complex, and I'm not
>> convinced this complexity is justified/necessary. I'd like to outline
>> a simpler alternative that I think would satisfy the key objectives.
>>
>> Key ideas:
>>
>> 1. Limit support to the HybridSource (or a derivative thereof). (E.g.,
>> I'm guessing the MySQL CDC Source could be reworked to be a hybrid
>> source.)
>> 2. Have this HybridSource wait to begin emitting watermarks until it
>> has handled all events from the bounded sources. (I'm not sure how the
>> HybridSource handles this now; if this is an incompatible change, we
>> can find a way to deal with that.)
>> 3. Instruct users to use an ingestion time watermarking strategy for
>> their unbounded source (the source the HybridSource handles last) if
>> they want to do something like a processing time temporal join.
>>
>> One objection to this is the limitation of only supporting the
>> HybridSource -- what about cases where the user has a single source,
>> e.g., a Kafka topic? I'm suggesting the user would divide their
>> build-side stream into two parts -- a bounded component that is fully
>> ingested by the hybrid source before watermarking begins, followed by
>> an unbounded component.
>>
>> I think this alternative handles use cases like processing-time
>> temporal join rather nicely, without requiring any changes to
>> watermarks or the core runtime.
>>
>> David
>>
>> On Thu, Jun 29, 2023 at 1:39 AM Martijn Visser <martijnvis...@apache.org> 
>> wrote:
>> >
>> > Hi Dong and Xuannan,
>> >
>> > I'm excited to see this FLIP. I think support for processing-time
>> > temporal joins is something that the Flink users will greatly benefit
>> > off. I specifically want to call-out that it's great to see the use
>> > cases that this enables. From a technical implementation perspective,
>> > I defer to the opinion of others with expertise on this topic.
>> >
>> > Best regards,
>> >
>> > Martijn
>> >
>> > On Sun, Jun 25, 2023 at 9:03 AM Xuannan Su <suxuanna...@gmail.com> wrote:
>> > >
>> > > Hi all,
>> > >
>> > > Dong(cc'ed) and I are opening this thread to discuss our proposal to
>> > > enhance the watermark to properly support processing-time temporal
>> > > join, which has been documented in FLIP-326 [1].
>> > >
>> > > We want to support the use case where the records from the probe side
>> > > of the processing-time temporal join need to wait until the build side
>> > > finishes the snapshot phrase by enhancing the expressiveness of the
>> > > Watermark. Additionally, these changes lay the groundwork for
>> > > simplifying the DataStream APIs, eliminating the need for users to
>> > > explicitly differentiate between event-time and processing-time,
>> > > resulting in a more intuitive user experience.
>> > >
>> > > Please refer to the FLIP document for more details about the proposed
>> > > design and implementation. We welcome any feedback and opinions on
>> > > this proposal.
>> > >
>> > > Best regards,
>> > >
>> > > Dong and Xuannan
>> > >
>> > > [1] 
>> > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-326%3A+Enhance+Watermark+to+Support+Processing-Time+Temporal+Join

Reply via email to