Hi David,

Thank you for the detailed comments and the suggestion of this alternative
approach.

I agree with you that this alternative can also address the target use-case
with the same correctness. In comparison to the current FLIP, this
alternative indeed introduces much less complexity to the Flink runtime
internal implementation.

At a high level, this alternative is simulating a one-time emission of
Watermark(useProcessingTime=true) with periodic emission of
Watermark(timestamp=wall-lock-time).

One downside of this alternative is that it can introduce a bit of extra
per-record runtime overhead. This is because the ingestion time watermark
will be emitted periodically according to pipeline.auto-watermark-interval
(200 ms by default). Thus there is still a short period where the watermark
from the HybridSource can be lagging behind wall-clock time. For operators
whose logic depends on the watermark, such as TemporalRowTimeJoinOperator,
they will need to check build-side watermark and delay/buffer records on
the probe-side until it receives the next ingestion-time watermark.

The impact of this overhead probably depends on the throughput/watermark of
the probe-side records. On the other hand, given that join operator is
typically already heavy (due to state backend access and build-side
buffer), and the watermark from probe-side (e.g. Kafka) is probably also
lagging behind wall-clock time, it is probably not an issue in most cases.
Therefore I agree that it is worth trying this approach. We can revisit
this issue if we any issues around performance or usability of this
approach.

Another potential concern is that it requires the user to use ingestion
time. I am not sure we are able to do this in a backward-compatible way
yet. We probably need to go through the existing APIs around ingestion time
watermark to validate this.

BTW, with the introduction of RecordAttributes(isBacklog=true/false) from
FLIP-327
<https://cwiki.apache.org/confluence/display/FLINK/FLIP-327%3A+Support+switching+from+batch+to+stream+mode+to+improve+throughput+when+processing+backlog+data>,
another short-term approach is to let TemporalProcessTimeJoinOperator keep
buffering records from MySQL/HybridSource as long as isBacklog=true, and
process them in a processing-time manner once it receives isBacklog=false.
This should also address the use-case targeted by FLIP-326. The only caveat
with this approach is that it is a bit hacky, because it requires
JoinOpertor to always buffer records when isBacklog=true, whereas
isBacklog's semantics only says it is "optional" to buffer records, which
can be an issue in the long term.

Thanks,
Dong

On Tue, Jul 25, 2023 at 2:37 AM David Anderson <dander...@apache.org> wrote:

> I'm delighted to see interest in developing support for
> processing-time temporal joins.
>
> The proposed implementation seems rather complex, and I'm not
> convinced this complexity is justified/necessary. I'd like to outline
> a simpler alternative that I think would satisfy the key objectives.
>
> Key ideas:
>
> 1. Limit support to the HybridSource (or a derivative thereof). (E.g.,
> I'm guessing the MySQL CDC Source could be reworked to be a hybrid
> source.)
> 2. Have this HybridSource wait to begin emitting watermarks until it
> has handled all events from the bounded sources. (I'm not sure how the
> HybridSource handles this now; if this is an incompatible change, we
> can find a way to deal with that.)
> 3. Instruct users to use an ingestion time watermarking strategy for
> their unbounded source (the source the HybridSource handles last) if
> they want to do something like a processing time temporal join.
>
> One objection to this is the limitation of only supporting the
> HybridSource -- what about cases where the user has a single source,
> e.g., a Kafka topic? I'm suggesting the user would divide their
> build-side stream into two parts -- a bounded component that is fully
> ingested by the hybrid source before watermarking begins, followed by
> an unbounded component.
>
> I think this alternative handles use cases like processing-time
> temporal join rather nicely, without requiring any changes to
> watermarks or the core runtime.
>
> David
>
> On Thu, Jun 29, 2023 at 1:39 AM Martijn Visser <martijnvis...@apache.org>
> wrote:
> >
> > Hi Dong and Xuannan,
> >
> > I'm excited to see this FLIP. I think support for processing-time
> > temporal joins is something that the Flink users will greatly benefit
> > off. I specifically want to call-out that it's great to see the use
> > cases that this enables. From a technical implementation perspective,
> > I defer to the opinion of others with expertise on this topic.
> >
> > Best regards,
> >
> > Martijn
> >
> > On Sun, Jun 25, 2023 at 9:03 AM Xuannan Su <suxuanna...@gmail.com>
> wrote:
> > >
> > > Hi all,
> > >
> > > Dong(cc'ed) and I are opening this thread to discuss our proposal to
> > > enhance the watermark to properly support processing-time temporal
> > > join, which has been documented in FLIP-326 [1].
> > >
> > > We want to support the use case where the records from the probe side
> > > of the processing-time temporal join need to wait until the build side
> > > finishes the snapshot phrase by enhancing the expressiveness of the
> > > Watermark. Additionally, these changes lay the groundwork for
> > > simplifying the DataStream APIs, eliminating the need for users to
> > > explicitly differentiate between event-time and processing-time,
> > > resulting in a more intuitive user experience.
> > >
> > > Please refer to the FLIP document for more details about the proposed
> > > design and implementation. We welcome any feedback and opinions on
> > > this proposal.
> > >
> > > Best regards,
> > >
> > > Dong and Xuannan
> > >
> > > [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-326%3A+Enhance+Watermark+to+Support+Processing-Time+Temporal+Join
>

Reply via email to