Hi Dong and Xuannan, Thanks for your proposal! Processing time temporal join is a very important feature, the proper implementation of which users have been waiting for a long time.
However, I am wondering whether it is worth enhancing Watermarks and related classes in order to support this feature. As was mentioned earlier in this thread, the proposed implementation looks a bit unnecessarily complicated. Moreover, I think that Watermarks and Processing time are the concepts that should not be mixed, because the original purpose of watermarks is to track the progress in Event time. Having the logic related to Processing time in Watermarks class could be a little bit confusing. I guess that I'm not alone with such concerns. Are there any other possible use cases to have the flag 'useProcessingTime' in Watermark class? Speaking about alternative approaches, the idea to use RecordAttributes(isBacklog=true/false) from FLIP-327 looks much better to me. Using the ' isBacklog' flag in TemporalProcessTimeJoinOperator may clearly state why it needs to buffer probe side data initially (because of backlog data from the build side of the join). In addition, such implementation would be much easier, since it uses already implemented FLIP-327 as the basis. Dong mentioned that this approach could be a bit hacky because originally, RecordAttributes(isBacklog=true/false) was proposed for optimization purposes ("optional" buffering). Can't we generalize the semantics of the ' isBacklog' flag to just notifying operators about the type of data that is currently being processed in stream (backlog / realtime)? In this case, there won't be a semantic problem anymore. Additionally, I have a little bit of an off-topic question. As I know, both Processing Time Temporal Join and Lookup Join use the same 'FOR SYSTEM TIME AS OF' syntax in Flink SQL. How is it planned to differentiate between these 2 kinds of join in Flink planner after enabling this syntax for Processing Time Temporal Join? I could propose to check the presence of existing LOOKUP SQL HINT [1] for enabling Lookup Join, but such change is not backward compatible, so we need to come up with another approach. I think that it should be mentioned in the FLIP also. Best regards, Alexander [1] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/hints/#lookup On 2023/06/25 07:02:00 Xuannan Su wrote: > Hi all, > > Dong(cc'ed) and I are opening this thread to discuss our proposal to > enhance the watermark to properly support processing-time temporal > join, which has been documented in FLIP-326 [1]. > > We want to support the use case where the records from the probe side > of the processing-time temporal join need to wait until the build side > finishes the snapshot phrase by enhancing the expressiveness of the > Watermark. Additionally, these changes lay the groundwork for > simplifying the DataStream APIs, eliminating the need for users to > explicitly differentiate between event-time and processing-time, > resulting in a more intuitive user experience. > > Please refer to the FLIP document for more details about the proposed > design and implementation. We welcome any feedback and opinions on > this proposal. > > Best regards, > > Dong and Xuannan > > [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-326%3A+Enhance+Watermark+to+Support+Processing-Time+Temporal+Join >