Hi everyone, I have been playing around with Flink SQL’s interval joins and noticed that some outputs from unmatched LEFT or FULL joins are arriving much later than I expected. Take the following query for example: SELECT * FROM orders o LEFT JOIN shipments s ON (o.orderID = s.orderID) AND o.rowtime BETWEEN s.rowtime - INTERVAL '1' HOUR AND s.rowtime + INTERVAL '1' HOUR
I expect any unmatched records from orders to be output once the watermark for both orders and shipments advances 1+ hours past the order record’s rowtime. However, I’m noticing that the watermarks actually need to advance by 2+ hours. When I looked into this further, I found these formulas in the TimeIntervalJoin class [1]: minCleanUpInterval = (leftRelativeSize + rightRelativeSize) / 2 long cleanUpTime = rowTime + leftRelativeSize + minCleanUpInterval + allowedLateness + 1; In this case, the cleanUpTime equals rowtime + 1h + 1h + 0 + 1, which matches the 2+ hours I was observing. From the commit history and documentation, I could not understand why the cleanUpTime is calculated this way. Why does the minCleanUpInterval exist and why is its value an average between the left and right relative sizes? I found a similar JIRA issue opened a few years ago, FLINK-18996 [2]. This issue can negatively affect the performance of downstream jobs that ingest from this output because delaying outputs of the interval join can lead to the output stream being very out of order. A downstream Flink job for example would have to adjust the allowed latency, or bounded out of orderness, to accommodate for this delay. Thanks, Charles [1] https://github.com/apache/flink/blob/ab70dcfa19827febd2c3cdc5cb81e942caa5b2f0/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/interval/TimeIntervalJoin.java#L366 [2] https://issues.apache.org/jira/browse/FLINK-18996