Hi everyone,

I have been playing around with Flink SQL’s interval joins and noticed that
some outputs from unmatched LEFT or FULL joins are arriving much later than
I expected. Take the following query for example:
SELECT * FROM orders o LEFT JOIN shipments s
ON (o.orderID = s.orderID) AND o.rowtime BETWEEN s.rowtime - INTERVAL '1'
HOUR AND s.rowtime + INTERVAL '1' HOUR

I expect any unmatched records from orders to be output once the watermark
for both orders and shipments advances 1+ hours past the order record’s
rowtime. However, I’m noticing that the watermarks actually need to advance
by 2+ hours. When I looked into this further, I found these formulas in the
TimeIntervalJoin class [1]:
minCleanUpInterval = (leftRelativeSize + rightRelativeSize) / 2
long cleanUpTime = rowTime + leftRelativeSize + minCleanUpInterval +
allowedLateness + 1;

In this case, the cleanUpTime equals rowtime + 1h + 1h + 0 + 1, which
matches the 2+ hours I was observing. From the commit history and
documentation, I could not understand why the cleanUpTime is calculated
this way. Why does the minCleanUpInterval exist and why is its value an
average between the left and right relative sizes? I found a similar JIRA
issue opened a few years ago, FLINK-18996 [2]. This issue can negatively
affect the performance of downstream jobs that ingest from this output
because delaying outputs of the interval join can lead to the output stream
being very out of order. A downstream Flink job for example would have to
adjust the allowed latency, or bounded out of orderness, to accommodate for
this delay.

Thanks,
Charles

[1]
https://github.com/apache/flink/blob/ab70dcfa19827febd2c3cdc5cb81e942caa5b2f0/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/interval/TimeIntervalJoin.java#L366
[2] https://issues.apache.org/jira/browse/FLINK-18996

Reply via email to