Dong, Thank you for the careful analysis of my proposal. Your conclusions make sense to me.
David On Mon, Jul 24, 2023 at 8:37 PM Dong Lin <lindon...@gmail.com> wrote: > > Hi David, > > Thank you for the detailed comments and the suggestion of this alternative > approach. > > I agree with you that this alternative can also address the target use-case > with the same correctness. In comparison to the current FLIP, this > alternative indeed introduces much less complexity to the Flink runtime > internal implementation. > > At a high level, this alternative is simulating a one-time emission of > Watermark(useProcessingTime=true) with periodic emission of > Watermark(timestamp=wall-lock-time). > > One downside of this alternative is that it can introduce a bit of extra > per-record runtime overhead. This is because the ingestion time watermark > will be emitted periodically according to pipeline.auto-watermark-interval > (200 ms by default). Thus there is still a short period where the watermark > from the HybridSource can be lagging behind wall-clock time. For operators > whose logic depends on the watermark, such as TemporalRowTimeJoinOperator, > they will need to check build-side watermark and delay/buffer records on the > probe-side until it receives the next ingestion-time watermark. > > The impact of this overhead probably depends on the throughput/watermark of > the probe-side records. On the other hand, given that join operator is > typically already heavy (due to state backend access and build-side buffer), > and the watermark from probe-side (e.g. Kafka) is probably also lagging > behind wall-clock time, it is probably not an issue in most cases. Therefore > I agree that it is worth trying this approach. We can revisit this issue if > we any issues around performance or usability of this approach. > > Another potential concern is that it requires the user to use ingestion time. > I am not sure we are able to do this in a backward-compatible way yet. We > probably need to go through the existing APIs around ingestion time watermark > to validate this. > > BTW, with the introduction of RecordAttributes(isBacklog=true/false) from > FLIP-327, another short-term approach is to let > TemporalProcessTimeJoinOperator keep buffering records from > MySQL/HybridSource as long as isBacklog=true, and process them in a > processing-time manner once it receives isBacklog=false. This should also > address the use-case targeted by FLIP-326. The only caveat with this approach > is that it is a bit hacky, because it requires JoinOpertor to always buffer > records when isBacklog=true, whereas isBacklog's semantics only says it is > "optional" to buffer records, which can be an issue in the long term. > > Thanks, > Dong > > On Tue, Jul 25, 2023 at 2:37 AM David Anderson <dander...@apache.org> wrote: >> >> I'm delighted to see interest in developing support for >> processing-time temporal joins. >> >> The proposed implementation seems rather complex, and I'm not >> convinced this complexity is justified/necessary. I'd like to outline >> a simpler alternative that I think would satisfy the key objectives. >> >> Key ideas: >> >> 1. Limit support to the HybridSource (or a derivative thereof). (E.g., >> I'm guessing the MySQL CDC Source could be reworked to be a hybrid >> source.) >> 2. Have this HybridSource wait to begin emitting watermarks until it >> has handled all events from the bounded sources. (I'm not sure how the >> HybridSource handles this now; if this is an incompatible change, we >> can find a way to deal with that.) >> 3. Instruct users to use an ingestion time watermarking strategy for >> their unbounded source (the source the HybridSource handles last) if >> they want to do something like a processing time temporal join. >> >> One objection to this is the limitation of only supporting the >> HybridSource -- what about cases where the user has a single source, >> e.g., a Kafka topic? I'm suggesting the user would divide their >> build-side stream into two parts -- a bounded component that is fully >> ingested by the hybrid source before watermarking begins, followed by >> an unbounded component. >> >> I think this alternative handles use cases like processing-time >> temporal join rather nicely, without requiring any changes to >> watermarks or the core runtime. >> >> David >> >> On Thu, Jun 29, 2023 at 1:39 AM Martijn Visser <martijnvis...@apache.org> >> wrote: >> > >> > Hi Dong and Xuannan, >> > >> > I'm excited to see this FLIP. I think support for processing-time >> > temporal joins is something that the Flink users will greatly benefit >> > off. I specifically want to call-out that it's great to see the use >> > cases that this enables. From a technical implementation perspective, >> > I defer to the opinion of others with expertise on this topic. >> > >> > Best regards, >> > >> > Martijn >> > >> > On Sun, Jun 25, 2023 at 9:03 AM Xuannan Su <suxuanna...@gmail.com> wrote: >> > > >> > > Hi all, >> > > >> > > Dong(cc'ed) and I are opening this thread to discuss our proposal to >> > > enhance the watermark to properly support processing-time temporal >> > > join, which has been documented in FLIP-326 [1]. >> > > >> > > We want to support the use case where the records from the probe side >> > > of the processing-time temporal join need to wait until the build side >> > > finishes the snapshot phrase by enhancing the expressiveness of the >> > > Watermark. Additionally, these changes lay the groundwork for >> > > simplifying the DataStream APIs, eliminating the need for users to >> > > explicitly differentiate between event-time and processing-time, >> > > resulting in a more intuitive user experience. >> > > >> > > Please refer to the FLIP document for more details about the proposed >> > > design and implementation. We welcome any feedback and opinions on >> > > this proposal. >> > > >> > > Best regards, >> > > >> > > Dong and Xuannan >> > > >> > > [1] >> > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-326%3A+Enhance+Watermark+to+Support+Processing-Time+Temporal+Join