Hi Dong and Xuannan,

Thanks for your proposal! Processing time temporal join is a very important
feature, the proper implementation of which users have been waiting for a
long time.

However, I am wondering whether it is worth enhancing Watermarks and
related classes in order to support this feature. As was mentioned earlier
in this thread, the proposed implementation looks a bit unnecessarily
complicated. Moreover, I think that Watermarks and Processing time are the
concepts that should not be mixed, because the original purpose of
watermarks is to track the progress in Event time. Having the logic related
to Processing time in Watermarks class could be a little bit confusing. I
guess that I'm not alone with such concerns. Are there any other possible
use cases to have the flag 'useProcessingTime' in Watermark class?

Speaking about alternative approaches, the idea to use
RecordAttributes(isBacklog=true/false) from FLIP-327 looks much better to
me. Using the ' isBacklog' flag in TemporalProcessTimeJoinOperator may
clearly state why it needs to buffer probe side data initially (because of
backlog data from the build side of the join). In addition, such
implementation would be much easier, since it uses already implemented
FLIP-327 as the basis. Dong mentioned that this approach could be a bit
hacky because originally, RecordAttributes(isBacklog=true/false) was
proposed for optimization purposes ("optional" buffering). Can't we
generalize the semantics of the ' isBacklog' flag to just notifying
operators about the type of data that is currently being processed in
stream (backlog / realtime)? In this case, there won't be a semantic
problem anymore.

Additionally, I have a little bit of an off-topic question. As I know, both
Processing Time Temporal Join and Lookup Join use the same 'FOR SYSTEM TIME
AS OF' syntax in Flink SQL. How is it planned to differentiate between
these 2 kinds of join in Flink planner after enabling this syntax for
Processing Time Temporal Join? I could propose to check the presence of
existing LOOKUP SQL HINT [1] for enabling Lookup Join, but such change is
not backward compatible, so we need to come up with another approach.  I
think that it should be mentioned in the FLIP also.

Best regards,
Alexander

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/hints/#lookup


On 2023/06/25 07:02:00 Xuannan Su wrote:
> Hi all,
>
> Dong(cc'ed) and I are opening this thread to discuss our proposal to
> enhance the watermark to properly support processing-time temporal
> join, which has been documented in FLIP-326 [1].
>
> We want to support the use case where the records from the probe side
> of the processing-time temporal join need to wait until the build side
> finishes the snapshot phrase by enhancing the expressiveness of the
> Watermark. Additionally, these changes lay the groundwork for
> simplifying the DataStream APIs, eliminating the need for users to
> explicitly differentiate between event-time and processing-time,
> resulting in a more intuitive user experience.
>
> Please refer to the FLIP document for more details about the proposed
> design and implementation. We welcome any feedback and opinions on
> this proposal.
>
> Best regards,
>
> Dong and Xuannan
>
> [1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-326%3A+Enhance+Watermark+to+Support+Processing-Time+Temporal+Join
>

Reply via email to